Tapestry Magic #15: Integration with cometd

Every time I work with Tapestry, I wonder how people can use Java and not use Tapestry. It is so much fun, but then, not everybody wants to have fun while working!!. Recently there was a jira filed for integration with cometd-java. I had never used cometd but thought of giving it a try. Integration was easy but testing it with gradle and eclipse was a headache and consumed much of my Sunday.

This integration will let you expose cometd services from Tapestry. These services will not be tapestry-ioc services, but will be able to access any tapestry-ioc services or objects using @Inject and co. annotations.

Implementation

Cometd-java uses servlets to initialize and expose its services. I have separated the two jobs. The initialization of BayeuxServer is done by BayeuxServiceSource and the services are exposed by an CometdRequestFilter, an implementation of HttpServletRequestFilter. The code below is just copy-paste stuff taken from CometdServlet.java and AnnotationCometdServlet.java

First the easy part, the CometdRequestFilter

public class CometdRequestFilter implements HttpServletRequestFilter {

   private BayeuxServerSource bayeuxServerSource;
   private String path;
   private Logger logger;

   public CometdRequestFilter(BayeuxServerSource bayeuxServerSource,
         RequestGlobals requestGlobals, Logger logger,
         @Symbol(CometdConstants.CONTEXT_PATH) @Inject String path) {
      this.bayeuxServerSource = bayeuxServerSource;
      this.path = path.toLowerCase();
      this.logger = logger;

      try {
         bayeuxServerSource.startServer();
         logger.debug("Allowed protocols are : " + bayeuxServerSource.getAllowedTransports());
      } catch (Exception e) {
         e.printStackTrace();
         throw new RuntimeException("Could not start Bayeux Server: "
               + e.getMessage(), e);
      }
   }

   private void setServiceOptions(HttpServletRequest request,
         HttpServletResponse response) {

   }

   public boolean service(HttpServletRequest request,
         HttpServletResponse response, HttpServletRequestHandler handler)
         throws IOException {
      //Not my path, so don't handle it
      if (!request.getRequestURI().startsWith(request.getContextPath() + path)) {
         logger.debug("Skipping " + request.getRequestURI() + " not matching "
               + path);
         return handler.service(request, response);
      }

      logger.debug("Processing request : " + request.getRequestURI());
      if ("OPTIONS".equals(request.getMethod())) {
         setServiceOptions(request, response);
         return true;
      }

      HttpTransport transport = null;
      
      for (HttpTransport allowedTransport : bayeuxServerSource.getAllowedTransports()) {
         if (allowedTransport != null && allowedTransport.accept(request)) {
            transport = allowedTransport;
            break;
         }
      }

      if (transport == null) {
         logger.error("Request " + request.getRequestURI()
               + " Unknown Bayeux Transport");
         response.sendError(HttpServletResponse.SC_BAD_REQUEST,
               "Unknown Bayeux Transport");
      } else {
         BayeuxServerImpl bayeux = bayeuxServerSource.getBayeuxImpl();
         try {
            bayeux.setCurrentTransport(transport);
            transport.setCurrentRequest(request);
            transport.handle(request, response);
         } catch (ServletException e) {
            throw new IOException(e);
         } finally {
            transport.setCurrentRequest(null);
            if (bayeux != null) {
               bayeux.setCurrentTransport(null);
            }
         }
      }

      return true;
   }

}

This filter is used to intercept all requests and if the request pertains to our sub-context/path, we consider it as a cometd request, otherwise we ask the remaining chain to handle it. In the constructor, we start the server. We don’t have to worry about the cleanup here as it is handled by the service itself.

While handling the request in service() method, we check if this is an allowed transport type and in case it is we ask the transport to handle the request. The Bayeux Server related details are placed in BayeuxServerSource.

public interface BayeuxServerSource {
   
   BayeuxServer getBayeux();
   
   BayeuxServerImpl getBayeuxImpl();
   
   List<HttpTransport> getAllowedTransports();

   void startServer();
}

The bayeux implementation is exposed here as it is required by the filter. BayeuxServerSource service is implemented as

public class BayeuxServerSourceImpl implements BayeuxServerSource,
      RegistryShutdownListener {

   private BayeuxServerImpl bayeux;

   private List<HttpTransport> allowedTransports;

   private List<Object> annotationServices;

   private ServerAnnotationProcessor annotationProcessor;

   private ObjectLocator locator;

   private List<BayeuxServerConfigurer> configurers;

   private Logger logger;

   private boolean initialized;

   public BayeuxServerSourceImpl(List<BayeuxServerConfigurer> configurers,
         ObjectLocator locator, Logger logger) {
      bayeux = newBayeuxServer();
      this.locator = locator;
      this.configurers = configurers;
      this.logger = logger;
      annotationServices = new ArrayList<Object>();
      allowedTransports = new ArrayList<HttpTransport>();
      initialized = false;
   }

   public synchronized void startServer(){
      if(initialized){
         throw new RuntimeException("The Bayeux Server is already started");
      }
      
      
      List<Class<?>> annotationServiceClasses = new ArrayList<Class<?>>();
      logger.debug("Configuring Bayeux Server using configurers");
      for(final BayeuxServerConfigurer configurer: configurers){
         configurer.configure(bayeux, annotationServiceClasses);
      }
      
      try {
         logger.debug("Starting server");
         bayeux.start();
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
      
      addAllowedTransports();
      
      setupAnnotatedServices(annotationServiceClasses);
      
      initialized = true;
   }

   private void addAllowedTransports() {
      logger.debug("Adding allowed transports");
      for (String transportName : bayeux.getAllowedTransports()) {
         ServerTransport transport = bayeux.getTransport(transportName);
         if (transport instanceof HttpTransport) {
            logger.debug("Adding transport " + transportName);
            allowedTransports.add((HttpTransport) transport);
         }
      }
   }

   private void setupAnnotatedServices(List<Class<?>> annotationServiceClasses) {
      logger.debug("Setting annotation services processor");
      annotationProcessor = new ServerAnnotationProcessor(getBayeux());

      for (Class<?> service : annotationServiceClasses) {
         Object object = locator.autobuild(service);
         logger.info("Building service for interface " + service);
         annotationProcessor.process(object);
         annotationServices.add(object);
      }
   }

   protected BayeuxServerImpl newBayeuxServer() {
      return new BayeuxServerImpl();
   }

   public BayeuxServer getBayeux() {
      return bayeux;
   }

   public BayeuxServerImpl getBayeuxImpl() {
      return bayeux;
   }

   public List<HttpTransport> getAllowedTransports() {
      return allowedTransports;
   }

   public void registryDidShutdown() {
      if (!initialized) {
         return;
      }
      cleanupAnnotatedServices();
      cleanupSessions();

      try {
         bayeux.stop();
      } catch (Exception e) {
         throw new RuntimeException(e);
      } finally {
         bayeux = null;
      }

      allowedTransports.clear();
   }

   private void cleanupAnnotatedServices() {
      if (annotationProcessor != null) {
         for (Object service : annotationServices) {
            logger.debug("Deprocessing " + service);
            annotationProcessor.deprocess(service);
         }
      }
   }

   private void cleanupSessions() {
      for (ServerSession session : bayeux.getSessions()) {
         logger.debug("Cleaning up session : " + session);
         ((ServerSessionImpl) session).cancelSchedule();
      }

   }
}

This is just startup and cleanup stuff for the service. The cometd annotation support has also been incorporated. The only magic from Tapestry side is ObjectLocator.autobuild(). This method does all the injection stuff and gives us an instance which is then passed on to the cometd’s ServerAnnotationProvider for further processing.

The contributions/initialization to BayeuxServerSource are made through BayeuxServerConfigurer.

public interface BayeuxServerConfigurer {
   void configure(BayeuxServerImpl bayeuxServerImpl, List<Class<?>> annotatedServices);
}

Finally, the module file

public class TapestryCometdModule {

   public BayeuxServerSource buildBayeuxServerSource(
         List<BayeuxServerConfigurer> configurers, ObjectLocator locator,
         Logger logger,
         RegistryShutdownHub registryShutdownHub) {
      BayeuxServerSourceImpl impl = new BayeuxServerSourceImpl(configurers, locator, logger);
      registryShutdownHub.addRegistryShutdownListener(impl);
      return impl;
   }

   public static void contributeHttpServletRequestHandler(
         OrderedConfiguration<HttpServletRequestFilter> filters) {
      filters.addInstance("Cometd", CometdRequestFilter.class);
   }

   public BayeuxServer buildBayeuxServer(BayeuxServerSource bayeuxServerSource,
         PropertyShadowBuilder shadowBuilder) {
      return shadowBuilder.build(bayeuxServerSource, "bayeux",
            BayeuxServer.class);
   }

   public static void contributeFactoryDefaults(
         MappedConfiguration<String, String> configuration) {
      configuration.add(CometdConstants.CONTEXT_PATH, "/cometd/");
   }

}

There is not much going on here, we create an instance of BayeuxServerSource, pass it to RegistryShutdownHub for proper cleanup and expose two of its methods as services using ShadowBuilder.

Usage

Usage is independent of Tapestry. We can use any of the cometd clients. I will discuss its java-client usage as I have already written such a client for testing purposes.

We first create a service

@Service("helloService")
public class HelloService {
   @SuppressWarnings("unused")
   @Inject
   private BayeuxServer bayeux;
   
   @Session
   private ServerSession serverSession;
   
   @Configure({"/service/hello"})
   public void configure(ConfigurableServerChannel channel){
      channel.setLazy(true);
      channel.addAuthorizer(GrantAuthorizer.GRANT_PUBLISH);
      channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE);
   }
   
   @Listener("/service/hello")
   public void echo(ServerSession remote, ServerMessage.Mutable message){
      Map<String, Object> input = message.getDataAsMap();
      String name = (String)input.get("name");
      
      Map<String, Object> output = new HashMap<String, Object>();
      output.put("greeting", "Hello, " + name); 
      remote.deliver(serverSession, "/hello", output, null);
   }
   
}

This example is from the standard official documentation. It configures a service “/service/hello” by granting publishing and subscribing authorities. It also adds a listener for it which looks for an parameter ‘name’ in the published message and delivers a hello message to “/hello” service.

The client connects to the server and subscribes to “/hello” service. It, then, publishes an message to “/service/hello” and checks if the subscribed service delivers a proper response.

public class JavaClientBasedTest extends Assert {
   private Server server;
   private static final Logger logger = LoggerFactory.getLogger(JavaClientBasedTest.class);
   private String greeting;

   @BeforeClass
   public void startupServer() throws Exception {
      server = new Server(9990);
      server.setStopAtShutdown(true);
      server.setGracefulShutdown(1000);

      WebAppContext context = new WebAppContext();

      context.setContextPath("/test");
      context.setWar("src/test/webapp");

      server.setHandler(context);
      server.start();
      logger.info("Server started");
   }

   @Test
   public void testJavaClient() throws Exception {
      HttpClient httpClient = new HttpClient();
      httpClient.setConnectTimeout(100000);
      httpClient.start();
      Map<String, Object> options = new HashMap<String, Object>();

      ClientTransport transport = LongPollingTransport.create(options,
            httpClient);
      final BayeuxClient client = new BayeuxClient(
            "http://localhost:9990/test/cometd/", transport);
      client.handshake();
      assertTrue(client.waitFor(1000, BayeuxClient.State.CONNECTED));

      assertNull(greeting);
      client.batch(new Runnable() {

         public void run() {
            logger.info("Subscribing to /hello");

            // /Subscribe
            client.getChannel("/hello").subscribe(
                  new ClientSessionChannel.MessageListener() {

                     public void onMessage(ClientSessionChannel channel,
                           Message message) {
                        logger.info("Got Subscribed message");
                        greeting = (String) message.getDataAsMap().get(
                              "greeting");
                     }
                  });

            logger.info("Subscribed, now publishing");
            
            // Publish
            Map<String, Object> data = new HashMap<String, Object>();
            data.put("name", "Tawus");
            client.getChannel("/service/hello").publish(data);

         }

      });

      Thread.sleep(4000);
      assertEquals(greeting, "Hello, Tawus");

      // Disconnect
      client.disconnect();
      client.waitFor(2000, BayeuxClient.State.DISCONNECTED);
   }

   @AfterClass
   public void stopServer() throws Exception {
      server.stop();
      logger.info("Server stopped");
   }
}

I have used a jetty server which is setup and started before the test and shutdown after the test is complete.

Source

You can find the source code along with tests here

About these ads

Tagged: ,

One thought on “Tapestry Magic #15: Integration with cometd

  1. uklance June 22, 2012 at 3:09 PM Reply

    Please see my tapestry-cometd library here: https://github.com/uklance/tapestry-cometd

    Cheers,
    Lance.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

Join 91 other followers

%d bloggers like this: