Spark Python Avro Kafka Deserialiser
Solution 1:
I had the same challenge - deserializing avro messages from Kafka in pyspark and solved it with the Confluent Schema Registry module's Messageserializer method, as in our case the schema is stored in a Confluent Schema Registry.
You can find that module at https://github.com/verisign/python-confluent-schemaregistry
from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(url='http://xx.xxx.xxx:8081')
serializer = MessageSerializer(schema_registry_client)
# simple decode to replace Kafka-streaming's built-in decode decoding UTF8 ()defdecoder(s):
decoded_message = serializer.decode_message(s)
return decoded_message
kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=decoder)
lines = kvs.map(lambda x: x[1])
lines.pprint()
Obviously as you can see this code is using the new, direct approach with no receivers, hence the createdDirectStream (see more at https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html)
Solution 2:
As mentioned by @Zoltan Fedor in the comment, the provided answer is a bit old now, as 2.5 years had passed since it was written. The confluent-kafka-python library has evolved to support the same functionality nativly. The only chnage needed in the given code is following.
from confluent_kafka.avro.cached_schema_registry_clientimportCachedSchemaRegistryClientfrom confluent_kafka.avro.serializer.message_serializerimportMessageSerializer
And then, you can change this line -
kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=serializer.decode_message)
I had tested it and it works nicely. I am adding this answer for anyone who may need it in future.
Solution 3:
If you don't consider to use Confluent Schema Registry and have a schema in a text file or dict object, you can use fastavro python package to decode Avro messages of your Kafka stream:
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
import io
import fastavro
defdecoder(msg):
# here should be your schema
schema = {
"namespace": "...",
"type": "...",
"name": "...",
"fields": [
{
"name": "...",
"type": "..."
},
...}
bytes_io = io.BytesIO(msg)
bytes_io.seek(0)
msg_decoded = fastavro.schemaless_reader(bytes_io, schema)
return msg_decoded
session = SparkSession.builder \
.appName("Kafka Spark Streaming Avro example") \
.getOrCreate()
streaming_context = StreamingContext(sparkContext=session.sparkContext,
batchDuration=5)
kafka_stream = KafkaUtils.createDirectStream(ssc=streaming_context,
topics=['your_topic_1', 'your_topic_2'],
kafkaParams={"metadata.broker.list": "your_kafka_broker_1,your_kafka_broker_2"},
valueDecoder=decoder)
Post a Comment for "Spark Python Avro Kafka Deserialiser"