/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef THRIFT_TMULTIPLEXEDPROCESSOR_H_ #define THRIFT_TMULTIPLEXEDPROCESSOR_H_ 1 #include #include #include #include namespace apache { namespace thrift { namespace protocol { /** * To be able to work with any protocol, we needed * to allow them to call readMessageBegin() and get a TMessage in exactly * the standard format, without the service name prepended to TMessage.name. */ class StoredMessageProtocol : public TProtocolDecorator { public: StoredMessageProtocol(std::shared_ptr _protocol, const std::string& _name, const TMessageType _type, const int32_t _seqid) : TProtocolDecorator(_protocol), name(_name), type(_type), seqid(_seqid) {} uint32_t readMessageBegin_virt(std::string& _name, TMessageType& _type, int32_t& _seqid) override { _name = name; _type = type; _seqid = seqid; return 0; // (Normal TProtocol read functions return number of bytes read) } std::string name; TMessageType type; int32_t seqid; }; } // namespace protocol /** * TMultiplexedProcessor is a TProcessor allowing * a single TServer to provide multiple services. * *

To do so, you instantiate the processor and then register additional * processors with it, as shown in the following example:

* *
* std::shared_ptr processor(new TMultiplexedProcessor()); * * processor->registerProcessor( * "Calculator", * std::shared_ptr( new CalculatorProcessor( * std::shared_ptr( new CalculatorHandler())))); * * processor->registerProcessor( * "WeatherReport", * std::shared_ptr( new WeatherReportProcessor( * std::shared_ptr( new WeatherReportHandler())))); * * std::shared_ptr transport(new TServerSocket(9090)); * TSimpleServer server(processor, transport); * * server.serve(); *
*/ class TMultiplexedProcessor : public TProcessor { public: typedef std::map > services_t; /** * 'Register' a service with this TMultiplexedProcessor. This * allows us to broker requests to individual services by using the service * name to select them at request time. * * \param [in] serviceName Name of a service, has to be identical to the name * declared in the Thrift IDL, e.g. "WeatherReport". * \param [in] processor Implementation of a service, usually referred to * as "handlers", e.g. WeatherReportHandler, * implementing WeatherReportIf interface. */ void registerProcessor(const std::string& serviceName, std::shared_ptr processor) { services[serviceName] = processor; } /** * Register a service to be called to process queries without service name * \param [in] processor Implementation of a service. */ void registerDefault(const std::shared_ptr& processor) { defaultProcessor = processor; } /** * Chew up invalid input and return an exception to throw. */ TException protocol_error(std::shared_ptr in, std::shared_ptr out, const std::string& name, int32_t seqid, const std::string& msg) const { in->skip(::apache::thrift::protocol::T_STRUCT); in->readMessageEnd(); in->getTransport()->readEnd(); ::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::PROTOCOL_ERROR, "TMultiplexedProcessor: " + msg); out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid); x.write(out.get()); out->writeMessageEnd(); out->getTransport()->writeEnd(); out->getTransport()->flush(); return TException(msg); } /** * This implementation of process performs the following steps: * *
    *
  1. Read the beginning of the message.
  2. *
  3. Extract the service name from the message.
  4. *
  5. Using the service name to locate the appropriate processor.
  6. *
  7. Dispatch to the processor, with a decorated instance of TProtocol * that allows readMessageBegin() to return the original TMessage.
  8. *
* * \throws TException If the message type is not T_CALL or T_ONEWAY, if * the service name was not found in the message, or if the service * name was not found in the service map. */ bool process(std::shared_ptr in, std::shared_ptr out, void* connectionContext) override { std::string name; protocol::TMessageType type; int32_t seqid; // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the // message header. This pulls the message "off the wire", which we'll // deal with at the end of this method. in->readMessageBegin(name, type, seqid); if (type != protocol::T_CALL && type != protocol::T_ONEWAY) { // Unexpected message type. throw protocol_error(in, out, name, seqid, "Unexpected message type"); } // Extract the service name boost::tokenizer > tok(name, boost::char_separator(":")); std::vector tokens; std::copy(tok.begin(), tok.end(), std::back_inserter(tokens)); // A valid message should consist of two tokens: the service // name and the name of the method to call. if (tokens.size() == 2) { // Search for a processor associated with this service name. auto it = services.find(tokens[0]); if (it != services.end()) { std::shared_ptr processor = it->second; // Let the processor registered for this service name // process the message. return processor ->process(std::shared_ptr( new protocol::StoredMessageProtocol(in, tokens[1], type, seqid)), out, connectionContext); } else { // Unknown service. throw protocol_error(in, out, name, seqid, "Unknown service: " + tokens[0] + ". Did you forget to call registerProcessor()?"); } } else if (tokens.size() == 1) { if (defaultProcessor) { // non-multiplexed client forwards to default processor return defaultProcessor ->process(std::shared_ptr( new protocol::StoredMessageProtocol(in, tokens[0], type, seqid)), out, connectionContext); } else { throw protocol_error(in, out, name, seqid, "Non-multiplexed client request dropped. " "Did you forget to call defaultProcessor()?"); } } else { throw protocol_error(in, out, name, seqid, "Wrong number of tokens."); } } private: /** Map of service processor objects, indexed by service names. */ services_t services; //! If a non-multi client requests something, it goes to the //! default processor (if one is defined) for backwards compatibility. std::shared_ptr defaultProcessor; }; } } #endif // THRIFT_TMULTIPLEXEDPROCESSOR_H_