How to connect Apache Spark


#1

Do not support spark?


#2

I’ve seen some people talk about connecting Spark, so I know it works. I haven’t been able to get it working myself (yet)


#3

Hi,

Assuming you have spark already installed the following is a simple set of steps for basic integration.

Download the MapD jdbc driver mapdjdbc-1.0-SNAPSHOT-jar-with-dependencies.jar from s3.

For simple set up copy downloaded file to your spark-shell bin directory

cp <downloaddir>/mapdjdbc-1.0-SNAPSHOT-jar-with-dependencies.jar <mydir>/spark/spark-2.2.0-bin-hadoop2.7/bin

cd to the spark bin directory

cd <mydir>/spark/spark-2.2.0-bin-hadoop2.7/bin

Start the spark-shell with additional parameters to place MapD jdbc drives in the class path

./spark-shell --driver-class-path mapdjdbc-1.0-SNAPSHOT-jar-with-dependencies.jar --jars mapdjdbc-1.0-SNAPSHOT-jar-with-dependencies.jar

From the spark-shell command line scala> load one of the default datasets into a dataframe

val peopleDF = spark.read.format("json").load("../examples/src/main/resources/people.json")

Output should look like this:

peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

display the contents of the Dataframe

peopleDf.show()

output should look like this:

scala> peopleDF.show();
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Confirm your MapD database is running and you don’t already have a table called people already. Run the following command on the machine running MapD server, not from the spark-shell.

myprompt$ build/bin/mapdql -p HyperInteractive
User mapd connected to database mapd
mapdql> \t
flights_2008_10k
mapdql>

Now go back to your spark-shell and run a dataframe write command. The options will need to be tweaked to use the hostname, ports, user and password being used by the MapD instance.

peopleDF.write.format("jdbc").
    option("url", "jdbc:mapd:localhost:9091:mapd").
    option("driver", "com.mapd.jdbc.MapDDriver").
    option("dbtable", "people").
    option("user", "mapd").
    option("password", "HyperInteractive").
    save();

Return to mapdql and view the available tables

mapdql> \t
flights_2008_10k
people
mapdql>

Check schema and contents of the newly created people table

mapdql> \d people
CREATE TABLE people (
age BIGINT,
name TEXT ENCODING DICT(32))

mapdql> select *  from people;
age|name
NULL|Michael
30|Andy
19|Justin
mapdql>

We have sucessfully loaded MapD from a Spark session in two command (the show() doesnt really count)

regards


#4

Oh~ It’s nice.
Thank you for your help. I will try it.


#5

Nevermind**** solved this part

My problems continue in the next post. Hopefully this marks the conclusion of me pestering you :frowning:


#6

I’m trying to write the file, but it says:

java.sql.SQLException: Connection failed - org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)

Any ideas?

This is my code:

%python
uncomp_venues.write.format(“jdbc”).option(“url”, “jdbc:mapd:localhost:8443:mapd”).option(“driver”, “com.mapd.jdbc.MapDDriver”).option(“dbtable”, “randomname”).option(“user”, “mapd”).option(“password”, “”).save()


#7

Here is the long version of the error:

---------------------------------------------------------------------------

Py4JJavaError Traceback (most recent call last)
in ()
----> 1 uncomp_venues.write.format(“jdbc”).option(“url”, “jdbc:mapd:localhost:8443:mapd”).option(“driver”, “com.mapd.jdbc.MapDDriver”).option(“dbtable”, “venues”).option(“user”, “mapd”).option(“password”, “”).save()

/databricks/spark/python/pyspark/sql/readwriter.pyc in save(self, path, format, mode, partitionBy, **options)
591 self.format(format)
592 if path is None:
–> 593 self._jwrite.save()
594 else:
595 self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
—> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 “An error occurred while calling {0}{1}{2}.\n”.
–> 319 format(target_id, “.”, name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling o842.save.
: java.sql.SQLException: Connection failed - org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)
at com.mapd.jdbc.MapDConnection.(MapDConnection.java:109)
at com.mapd.jdbc.MapDDriver.connect(MapDDriver.java:52)
at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:61)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:52)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:58)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:490)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:94)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)


#8

Hi

Try changing your port from 8443 to 9091, the driver you are using expects to talk to thrift over th binary protocol not http

Regards


#9

the isn’t anything is listening on host port you are calling; 8443 port isn’t the default of any mapd service .
Try to use the default port of mapd database


#10

Okay I changed it to 9091, looks like the same error:

Py4JJavaError Traceback (most recent call last)
in ()
----> 1 uncomp_venues.write.format(“jdbc”).option(“url”, “jdbc:mapd:localhost:9091:mapd”).option(“driver”, “com.mapd.jdbc.MapDDriver”).option(“dbtable”, “gobbledeedook”).option(“user”, “mapd”).option(“password”, “passwordtypedhere”).save()

/databricks/spark/python/pyspark/sql/readwriter.pyc in save(self, path, format, mode, partitionBy, **options)
591 self.format(format)
592 if path is None:
–> 593 self._jwrite.save()
594 else:
595 self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
—> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 “An error occurred while calling {0}{1}{2}.\n”.
–> 319 format(target_id, “.”, name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling o1103.save.
: java.sql.SQLException: Connection failed - org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)
at com.mapd.jdbc.MapDConnection.(MapDConnection.java:109)
at com.mapd.jdbc.MapDDriver.connect(MapDDriver.java:52)
at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:61)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:52)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:58)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:490)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:94)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)


#11

In case it matters, I’m using Databricks 3.0 (includes Apache Spark 2.2.0, Scala 2.11).

Is it possible the Databricks servers have to be in the same region as the MapD server I set up in order for it to work?


#12

Hi,

If they are not on the same machine you will have to change the server name from localhost to whatever the real server name is in the connection url?

jdbc:mapd:<REAL_HOST_NAME_HERE>:9091:mapd

Regards


#13

Dwayne, I’m sorry you have to deal with someone like me, who is so much less informed than you. Thank you again for your patience. I hope this will work soon.

I changed the localhost to the public DNS for the instance I’m running. This time the error is a little more hopeful:

java.sql.SQLException: Query failed : Syntax error at: " sql was 'CREATE TABLE gobbledeedook ("id" TEXT , "address" TEXT , "createdTime" TIMESTAMP NOT NULL, "chainId" TEXT , "contactUrl" TEXT , "description" TEXT , "keyword" TEXT , "modifiedTime" TIMESTAMP NOT NULL, "name" TEXT , "postcode" TEXT , "seasonality" INTEGER , "slug" TEXT , "spaceId" TEXT , "venueIdOfPreviousOccupant" TEXT , "website" TEXT , "latitude" DOUBLE PRECISION , "longitude" DOUBLE PRECISION , "numMedia" INTEGER , "numNewsletters" INTEGER , "hasFacebook" BIT(1) , "hasInsta" BIT(1) , "placeTypeName" TEXT , "statusTypeName" TEXT , "primtid" TEXT , "menuCheckTime" TIMESTAMP , "jumpNote" TEXT , "inGeneric" BIT(1) , "setupTime" TIMESTAMP , "jumpedTime" TIMESTAMP , "spoonedTime" TIMESTAMP , "verifiedTime" TIMESTAMP , "offeringsValue" INTEGER , "TagName" TEXT , "offeringTag" TEXT , "boundaryName" TEXT NOT NULL, "boundaryType" INTEGER , "boundaryAdminLevel" INTEGER ) '

The full error is:


Py4JJavaError Traceback (most recent call last)
in ()
----> 1 uncomp_venues.write.format(“jdbc”).option(“url”, “jdbc:mapd:PUBLICDNSfromEC2:9091:mapd”).option(“driver”, “com.mapd.jdbc.MapDDriver”).option(“dbtable”, “gobbledeedook”).option(“user”, “mapd”).option(“password”, “”).save()

/databricks/spark/python/pyspark/sql/readwriter.pyc in save(self, path, format, mode, partitionBy, **options)
591 self.format(format)
592 if path is None:
–> 593 self._jwrite.save()
594 else:
595 self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
—> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 “An error occurred while calling {0}{1}{2}.\n”.
–> 319 format(target_id, “.”, name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling o1244.save.
: java.sql.SQLException: Query failed : Syntax error at: " sql was 'CREATE TABLE gobbledeedook (“id” TEXT , “address” TEXT , “createdTime” TIMESTAMP NOT NULL, “chainId” TEXT , “contactUrl” TEXT , “description” TEXT , “keyword” TEXT , “modifiedTime” TIMESTAMP NOT NULL, “name” TEXT , “postcode” TEXT , “seasonality” INTEGER , “slug” TEXT , “spaceId” TEXT , “venueIdOfPreviousOccupant” TEXT , “website” TEXT , “latitude” DOUBLE PRECISION , “longitude” DOUBLE PRECISION , “numMedia” INTEGER , “numNewsletters” INTEGER , “hasFacebook” BIT(1) , “hasInsta” BIT(1) , “placeTypeName” TEXT , “statusTypeName” TEXT , “primtid” TEXT , “menuCheckTime” TIMESTAMP , “jumpNote” TEXT , “inGeneric” BIT(1) , “setupTime” TIMESTAMP , “jumpedTime” TIMESTAMP , “spoonedTime” TIMESTAMP , “verifiedTime” TIMESTAMP , “offeringsValue” INTEGER , “TagName” TEXT , “offeringTag” TEXT , “boundaryName” TEXT NOT NULL, “boundaryType” INTEGER , “boundaryAdminLevel” INTEGER ) '
at com.mapd.jdbc.MapDStatement.executeUpdate(MapDStatement.java:83)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:805)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:490)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:94)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)


#14

Hi,

Please confirm you are using the jdbc driver that is pointed to here

The error is suggesting you are running with an older jdbc driver

Regards


#15

Confirmed. I re-downloaded the jar you pointed to, re-uploaded it to Databricks and restarted the cluster to be sure. Same error


#16

Is there any simple error I’ve made that could be causing this? Perhaps something to do with setup in Databricks: https://docs.databricks.com/user-guide/clusters/jdbc-odbc.html ?


#17

Hi,

The error messages you are seeing tells me you are not running the jdbc driver mentioned as the line numbers do not sync up, I suspect you have an older driver in your classpath somewhere in your spark setup that is causing your issue. You need to remove it and then try again.

Ultimately you are going to hit a different issue as MapD does not currently support the syntax for a column BIT(1)
(We expect Boolean as the type) so there is going to need to be another change made in the driver to support this.

Regards


#18

Okay I’ll start digging for that. I’ll just remove those BIT syntax columns for now, or try and manually make them Boolean.


#19

Dwayne Berry, or whatever your real name is, thank you. It works. It finally flipping works.

For anyone running into a class path issue on Databricks, here is the code I used to remove old jar from lingering within classpath (Scala):

val jarfiles = dbutils.fs.ls(“dbfs:/FileStore/jars”)

.map(_.path)

.filter(_.indexOf(“your pattern”) > -1)

jarfiles.foreach(dbutils.fs.rm(_))

Using the pattern “mapd” to replace “your pattern”, I was able to remove all mapD jars. I then re-uploaded the Jar, and ran it again. Bingo, bango.