/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package spark.streaming.receivers import spark.streaming.receivers._ import com.rabbitmq.client.ConnectionFactory import com.rabbitmq.client.Channel import com.rabbitmq.client.DefaultConsumer import com.rabbitmq.client.Envelope import com.rabbitmq.client.AMQP.BasicProperties import com.thenewmotion.akka.rabbitmq._ import akka.actor.Actor import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.Props import spark.Logging /** * A receiver to subscribe to RabbitMQ stream. */ class RMQReceiver[T: ClassManifest](uname: String, passwd: String, vhost: String, host: String, qname: String, topic: String ) extends Actor with Receiver with Logging { implicit val system = ActorSystem() val factory = new ConnectionFactory() factory.setUsername(uname) factory.setPassword(passwd) factory.setVirtualHost(vhost) factory.setHost(host) override def preStart() = { val connectionActor = system.actorOf(Props(new ConnectionActor(factory))) def setupChannel(channel: Channel) { val result = channel.queueDeclare() val queue = result.getQueue() channel.queueBind(queue, qname, topic) val consumer = new DefaultConsumer(channel) { override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) { pushBlock(fromBytes(body)) } } channel.basicConsume(queue, true, consumer) } val channelActor: ActorRef = connectionActor.createChannel(Props(new ChannelActor(setupChannel))) } def fromBytes(x: Array[Byte]) = new String(x, "UTF-8") def receive: Receive = { case _ ⇒ logInfo("unknown mwssage") } }