[esb-java-dev] svn commit r24296 - in branches/synapse/1.2.wso2v1: modules/core/src/main/java/org/apache/synapse/config modules/core/src/main/java/org/apache/synapse/config/xml modules/core/src/main/java/org/apache/synapse/config/xml/eventing modules/core/src/main/java/org/apache/synapse/core/axis2 modules/core/src/main/java/org/apache/synapse/eventing modules/core/src/main/java/org/apache/synapse/eventing/filters modules/core/src/main/java/org/apache/synapse/eventing/managers repository/conf

asankaa at wso2.com asankaa at wso2.com
Wed Nov 19 23:52:41 PST 2008


Author: asankaa
Date: Wed Nov 19 23:52:41 2008
New Revision: 24296
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=24296

Log:
initial impl of eventing


Added:
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/PropertyHelper.java   (contents, props changed)
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java   (contents, props changed)
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java   (contents, props changed)
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventFilter.java   (contents, props changed)
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java   (contents, props changed)
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingConstants.java   (contents, props changed)
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingMessageReceiver.java   (contents, props changed)
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/Subscription.java   (contents, props changed)
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionFactory.java   (contents, props changed)
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java   (contents, props changed)
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/filters/
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/filters/XPathBasedEventFilter.java   (contents, props changed)
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java   (contents, props changed)
Modified:
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java
   branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseInitializationModule.java
   branches/synapse/1.2.wso2v1/repository/conf/synapse.xml

Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java?rev=24296&r1=24295&r2=24296&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java	(original)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java	Wed Nov 19 23:52:41 2008
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.*;
+import org.apache.synapse.eventing.EventSource;
 import org.apache.synapse.commons.util.RMIRegistryController;
 import org.apache.synapse.commons.util.datasource.InMemoryDataSourceRepository;
 import org.apache.synapse.commons.util.datasource.JNDIBasedDataSourceRepository;
@@ -110,6 +111,11 @@
 	private String pathToConfigFile = null;
 
     /**
+     * Holds Event Sources defined through Synapse
+     */
+    private Map<String, EventSource> eventSources = new HashMap<String, EventSource>();
+
+    /**
 	 * Add a named sequence into the local registry
 	 *
 	 * @param key
@@ -917,4 +923,23 @@
     public TaskDescriptionRepository getTaskDescriptionRepository() {
         return repository;
     }
+    public void addEventSource(String name, EventSource eventSource) {
+        eventSources.put(name, eventSource);
+    }
+
+    public EventSource getEventSource(String name) {
+        return eventSources.get(name);
+    }
+
+    public void removeEventSource(String name) {
+        eventSources.remove(name);
+    }
+
+    public Collection<EventSource> getEventSources() {
+        return eventSources.values();
+    }
+
+    public void setEventSources(Map<String, EventSource> eventSources) {
+        this.eventSources = eventSources;
+    }
 }

Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/PropertyHelper.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/PropertyHelper.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/PropertyHelper.java	Wed Nov 19 23:52:41 2008
@@ -0,0 +1,191 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+
+import javax.xml.namespace.QName;
+import java.lang.reflect.Method;
+
+/**
+ * This class will be used as a Helper class to get the properties loaded while building the
+ * Synapse Configuration from the XML
+ */
+public class PropertyHelper {
+
+    /**
+     * Log variable for the logging purposes
+     */
+    private static final Log log = LogFactory.getLog(PropertyHelper.class);
+
+    /**
+     * Find and invoke the setter method with the name of form setXXX passing in the value given
+     * on the POJO object
+     * @param name name of the setter field
+     * @param val value to be set
+     * @param obj POJO instance
+     */
+    public static void setInstanceProperty(String name, Object val, Object obj) {
+
+        String mName = "set" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
+        Method method = null;
+
+        try {
+            Method[] methods = obj.getClass().getMethods();
+            boolean invoked = false;
+
+            for (int i=0; i<methods.length; i++) {
+                if (mName.equals(methods[i].getName())) {
+                    Class[] params = methods[i].getParameterTypes();
+                    if (params.length != 1) {
+                        handleException("Did not find a setter method named : " + mName +
+                            "() that takes a single String, int, long, float, double " +
+                            "or boolean parameter");
+                    } else if (val instanceof String) {
+                        String value = (String) val;
+                        if (params[0].equals(String.class)) {
+                            method = obj.getClass().getMethod(mName, new Class[]{String.class});
+                            method.invoke(obj, new String[]{value});
+                        } else if (params[0].equals(int.class)) {
+                            method = obj.getClass().getMethod(mName, new Class[]{int.class});
+                            method.invoke(obj, new Integer[]{new Integer(value)});
+                        } else if (params[0].equals(long.class)) {
+                            method = obj.getClass().getMethod(mName, new Class[]{long.class});
+                            method.invoke(obj, new Long[]{new Long(value)});
+                        } else if (params[0].equals(float.class)) {
+                            method = obj.getClass().getMethod(mName, new Class[]{float.class});
+                            method.invoke(obj, new Float[]{new Float(value)});
+                        } else if (params[0].equals(double.class)) {
+                            method = obj.getClass().getMethod(mName, new Class[]{double.class});
+                            method.invoke(obj, new Double[]{new Double(value)});
+                        } else if (params[0].equals(boolean.class)) {
+                            method = obj.getClass().getMethod(mName, new Class[]{boolean.class});
+                            method.invoke(obj, new Boolean[]{new Boolean(value)});
+                        } else {
+                            handleException("Did not find a setter method named : " + mName +
+                                "() that takes a single String, int, long, float, double " +
+                                "or boolean parameter");
+                        }
+                    } else {
+                        if (params[0].equals(OMElement.class)) {
+                            method = obj.getClass().getMethod(mName, new Class[]{OMElement.class});
+                            method.invoke(obj, new OMElement[]{(OMElement) val});
+                        }
+                    }
+                    invoked = true;
+                }
+            }
+
+            if (!invoked) {
+                handleException("Did not find a setter method named : " + mName +
+                    "() that takes a single String, int, long, float, double " +
+                    "or boolean parameter");
+            }
+
+        } catch (Exception e) {
+            handleException("Error invoking setter method named : " + mName +
+                "() that takes a single String, int, long, float, double " +
+                "or boolean parameter", e);
+        }
+    }
+
+    /**
+     * This method will set the static property discribed in the OMElement to the specified object.
+     * This Object should have the setter method for the specified property name
+     * 
+     * @param property - OMElement specifying the property to be built in to the object
+     * @param o - Object to which the specified property will be set.
+     */
+    public static void setStaticProperty(OMElement property, Object o) {
+
+        if (property.getLocalName().toLowerCase().equals("property")) {
+
+            String propertyName = property.getAttributeValue(new QName("name"));
+            String mName = "set"
+                    + Character.toUpperCase(propertyName.charAt(0))
+                    + propertyName.substring(1);
+
+            // try to set String value first
+            if (property.getAttributeValue(new QName("value")) != null) {
+                String value = property.getAttributeValue(new QName("value"));
+
+                try {
+                    Method method = o.getClass().getMethod(mName, new Class[]{String.class});
+                    if (log.isDebugEnabled()) {
+                        log.debug("Setting property :: invoking method "
+                                + mName + "(" + value + ")");
+                    }
+                    method.invoke(o, new Object[]{value});
+
+                } catch (Exception e) {
+                    handleException("Error setting property : " + propertyName
+                            + " as a String property into class mediator : " + o.getClass() + " : "
+                            + e.getMessage(), e);
+                }
+                
+            } else {
+                // now try XML child
+                OMElement value = property.getFirstElement();
+                if (value != null) {
+
+                    try {
+                        Method method = o.getClass().getMethod(mName, new Class[]{OMElement.class});
+                        if (log.isDebugEnabled()) {
+                            log.debug("Setting property :: invoking method "
+                                    + mName + "(" + value + ")");
+                        }
+                        method.invoke(o, new Object[]{value});
+
+                    } catch (Exception e) {
+                        handleException("Error setting property : " + propertyName
+                                + " as an OMElement property into class mediator : "
+                                + o.getClass() + " : " + e.getMessage(), e);
+                    }
+
+                }
+
+            }
+        }
+    }
+
+    /**
+     * This method will check the given OMElement represent either a static property or not
+     * 
+     * @param property - OMElement to be checked for the static property
+     * @return boolean true if the elemet represents a static property element false otherwise
+     */
+    public static boolean isStaticProperty(OMElement property) {
+        return "property".equals(property.getLocalName().toLowerCase())
+            && (property.getAttributeValue(new QName("expression")) == null);
+    }
+
+    private static void handleException(String message, Throwable e) {
+        log.error(message + e.getMessage());
+        throw new SynapseException(message, e);
+    }
+
+    private static void handleException(String message) {
+        log.error(message);
+        throw new SynapseException(message);
+    }    
+}

Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java?rev=24296&r1=24295&r2=24296&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java	(original)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java	Wed Nov 19 23:52:41 2008
@@ -27,10 +27,12 @@
 import org.apache.synapse.Startup;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
+import org.apache.synapse.eventing.EventSource;
 import org.apache.synapse.config.Entry;
 import org.apache.synapse.config.SynapseConfigUtils;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.config.xml.endpoints.EndpointFactory;
+import org.apache.synapse.config.xml.eventing.EventSourceFactory;
 import org.apache.synapse.core.axis2.ProxyService;
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.mediators.MediatorProperty;
@@ -85,6 +87,8 @@
                     defineProxy(config, elt);
                 } else if (XMLConfigConstants.REGISTRY_ELT.equals(elt.getQName())) {
                     defineRegistry(config, elt);
+                } else if (XMLConfigConstants.EVENT_SOURCE_ELT.equals(elt.getQName())) {
+                    defineEventSource(config, elt);                    
                 } else if (XMLConfigConstants.TASK_ELT.equals(elt.getQName())) {
                     defineStartup(config, elt);
                 } else {
@@ -153,6 +157,13 @@
         config.addProxyService(proxy.getName(), proxy);
     }
 
+    private static void defineEventSource(SynapseConfiguration config, OMElement elem) {
+        EventSource eventSource = EventSourceFactory.createEventSource(elem);
+        if (config.getEventSource(eventSource.getName()) != null) {
+            handleException("Duplicate proxy service with name : " + eventSource.getName());
+        }
+        config.addEventSource(eventSource.getName(), eventSource);
+    }
     private static void defineEntry(SynapseConfiguration config, OMElement elem) {
         Entry entry = EntryFactory.createEntry(elem);
         if (config.getLocalRegistry().get(entry.getKey()) != null) {

Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java?rev=24296&r1=24295&r2=24296&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java	(original)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java	Wed Nov 19 23:52:41 2008
@@ -19,9 +19,11 @@
 import org.apache.synapse.config.Entry;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.config.xml.endpoints.EndpointSerializer;
+import org.apache.synapse.config.xml.eventing.EventSourceSerializer;
 import org.apache.synapse.core.axis2.ProxyService;
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.Startup;
+import org.apache.synapse.eventing.EventSource;
 
 public class SynapseXMLConfigurationSerializer implements ConfigurationSerializer {
 
@@ -60,6 +62,10 @@
             ProxyServiceSerializer.serializeProxy(definitions, service);
         }
 
+        for (EventSource eventSource : synCfg.getEventSources()) {
+            EventSourceSerializer.serializeEventSource(definitions, eventSource);
+        }
+        
         Map entries = new HashMap();
         Map<String, Endpoint> endpoints = new HashMap<String, Endpoint>();
         Map sequences = new HashMap();

Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java?rev=24296&r1=24295&r2=24296&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java	(original)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java	Wed Nov 19 23:52:41 2008
@@ -20,6 +20,8 @@
 package org.apache.synapse.config.xml;
 
 import org.apache.synapse.SynapseConstants;
+import org.apache.axiom.om.OMNamespace;
+
 import javax.xml.namespace.QName;
 
 /**
@@ -29,6 +31,7 @@
 
     // re-definition of the Synapse NS here to make things easier for the XML config lang code
     public static final String SYNAPSE_NAMESPACE = SynapseConstants.SYNAPSE_NAMESPACE;
+    public static final OMNamespace SYNAPSE_OMNAMESPACE = SynapseConstants.SYNAPSE_OMNAMESPACE;
 
     //- Mediators -
     //-- PropertyMediator --
@@ -55,6 +58,7 @@
     public static final QName REGISTRY_ELT    = new QName(SYNAPSE_NAMESPACE, "registry");
     public static final QName TASK_ELT        = new QName(SYNAPSE_NAMESPACE, "task");
     public static final QName PROXY_ELT       = new QName(SYNAPSE_NAMESPACE, "proxy");
+    public static final QName EVENT_SOURCE_ELT = new QName(SYNAPSE_NAMESPACE, "eventSource"); 
     public static final String NULL_NAMESPACE = "";
     public static final Object QUARTZ_QNAME   =
         new QName("http://www.opensymphony.com/quartz/JobSchedulingData", "quartz");

Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java	Wed Nov 19 23:52:41 2008
@@ -0,0 +1,111 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.config.xml.eventing;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.xml.PropertyHelper;
+import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.eventing.EventSource;
+import org.apache.synapse.eventing.SubscriptionManager;
+
+import javax.xml.namespace.QName;
+import java.util.Iterator;
+
+/**
+ * <eventSource name="blah">
+ *   <subscriptionManager class="org.apache.synapse.events.DefaultInMemorySubscriptionManager">
+ *      <property name="other" value="some text property"/>
+ *   </subscriptionManager>
+ *   <subscription id="static1">
+ *      <filter....>
+ *      <sequence...>
+ *      <endpoint..>
+ *   </subscription>*
+ * <eventSource>
+ */
+public class EventSourceFactory {
+
+    private static final Log log = LogFactory.getLog(EventSourceFactory.class);
+    private static final QName SUBSCRIPTION_MANAGER_QNAME
+            = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "subscriptionManager");
+    private static final QName PROPERTIES_QNAME = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "property");
+    private static final QName WS_EVENTING_QNAME
+            = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "wsEventing");
+
+    public static EventSource createEventSource(OMElement elem) {
+
+        EventSource eventSource = null;
+
+        OMAttribute name = elem.getAttribute(new QName(XMLConfigConstants.NULL_NAMESPACE, "name"));
+        if (name == null) {
+            handleException("The 'name' attribute is required for a event source de");
+        } else {
+            eventSource = new EventSource(name.getAttributeValue());
+        }
+
+        OMElement subscriptionManagerElem = elem.getFirstChildWithName(SUBSCRIPTION_MANAGER_QNAME);
+        if (eventSource != null && subscriptionManagerElem != null) {
+
+            OMAttribute clazz = subscriptionManagerElem.getAttribute(new QName(XMLConfigConstants.NULL_NAMESPACE, "class"));
+            if (clazz != null) {
+                String className = clazz.getAttributeValue();
+                try {
+                    Class subscriptionManagerClass = Class.forName(className);
+                    SubscriptionManager manager = (SubscriptionManager) subscriptionManagerClass.newInstance();
+                    Iterator itr = subscriptionManagerElem.getChildrenWithName(PROPERTIES_QNAME);
+                    while (itr.hasNext()) {
+                        OMElement propElem = (OMElement) itr.next();
+                        String propName = propElem.getAttribute(new QName("name")).getAttributeValue();
+                        String propValue = propElem.getAttribute(new QName("value")).getAttributeValue();
+                        manager.addProperty(propName, propValue);
+                        PropertyHelper.setStaticProperty(propElem, manager);
+                    }
+                    eventSource.setSubscriptionManager(manager);
+                } catch (ClassNotFoundException e) {
+                    handleException("SubscriptionManager class not found", e);
+                } catch (IllegalAccessException e) {
+                    handleException("Unable to access the SubscriptionManager object", e);
+                } catch (InstantiationException e) {
+                    handleException("Unable to instantiate the SubscriptionManager object", e);
+                }
+            } else {
+                handleException("Subscription manager class is a required attribute");
+            }
+        } else {
+            handleException("Subscription Manager has not been specified for the event source");
+        }
+
+        return eventSource;
+    }
+
+    private static void handleException(String msg) {
+        log.error(msg);
+        throw new SynapseException(msg);
+    }
+
+    private static void handleException(String msg, Exception e) {
+        log.error(msg, e);
+        throw new SynapseException(msg, e);
+    }
+}

Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java	Wed Nov 19 23:52:41 2008
@@ -0,0 +1,73 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.config.xml.eventing;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.eventing.EventSource;
+
+/**
+ * <eventSource name="blah">
+ *   <subscriptionManager class="org.apache.synapse.events.DefaultInMemorySubscriptionManager">
+ *      <property name="other" value="some text property"/>
+ *   </subscriptionManager>
+ *   <subscription id="static1">
+ *      <filter....>
+ *      <sequence...>
+ *      <endpoint..>
+ *   </subscription>*
+ * <eventSource>
+ */
+public class EventSourceSerializer {
+
+    public static OMElement serializeEventSource(OMElement elem, EventSource eventSource) {
+
+        OMFactory fac = OMAbstractFactory.getOMFactory();
+        OMNamespace nullNS = fac.createOMNamespace(XMLConfigConstants.NULL_NAMESPACE, "");
+
+        OMElement evenSourceElem = fac.createOMElement("eventSource", XMLConfigConstants.SYNAPSE_OMNAMESPACE);
+        if (eventSource.getName() != null) {
+            evenSourceElem.addAttribute(fac.createOMAttribute("name", nullNS, eventSource.getName()));
+        }
+
+        if (eventSource.getSubscriptionManager() != null) {
+            OMElement subManagerElem = fac.createOMElement("subscriptionManager", XMLConfigConstants.SYNAPSE_OMNAMESPACE);
+            subManagerElem.addAttribute(fac.createOMAttribute("class", nullNS, eventSource.getSubscriptionManager().getClass().getName()));
+            for (String name : eventSource.getSubscriptionManager().getPropertyNames()) {
+                OMElement propElem = fac.createOMElement("property", XMLConfigConstants.SYNAPSE_OMNAMESPACE);
+                propElem.addAttribute(fac.createOMAttribute("name", nullNS, name));
+                propElem.addAttribute(fac.createOMAttribute(
+                        "value", nullNS, eventSource.getSubscriptionManager().getPropertyValue(name)));
+                subManagerElem.addChild(propElem);
+            }
+            evenSourceElem.addChild(subManagerElem);
+        }
+
+        if (elem != null) {
+            elem.addChild(evenSourceElem);
+        }
+
+        return evenSourceElem;
+    }
+}
\ No newline at end of file

Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseInitializationModule.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseInitializationModule.java?rev=24296&r1=24295&r2=24296&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseInitializationModule.java	(original)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseInitializationModule.java	Wed Nov 19 23:52:41 2008
@@ -33,6 +33,7 @@
 import org.apache.synapse.ServerManager;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
+import org.apache.synapse.eventing.EventSource;
 import org.apache.synapse.commons.util.datasource.DataSourceInformationRepositoryHelper;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.config.SynapseConfigurationBuilder;
@@ -150,6 +151,10 @@
             }
         }
 
+       for (EventSource eventSource : synCfg.getEventSources()) {
+            eventSource.buildService(axisCfg);
+            eventSource.getSubscriptionManager().init();
+        }
         synCfg.init(configurationContext);
         
         log.info("Synapse initialized successfully...!");

Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventFilter.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventFilter.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventFilter.java	Wed Nov 19 23:52:41 2008
@@ -0,0 +1,30 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+import org.apache.synapse.MessageContext;
+
+/**
+ *
+ */
+public interface EventFilter {
+
+    public boolean isSatisfied(MessageContext mc);
+}

Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java	Wed Nov 19 23:52:41 2008
@@ -0,0 +1,76 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.InOutAxisOperation;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.synapse.SynapseConstants;
+
+import javax.xml.namespace.QName;
+
+/**
+ *
+ */
+public class EventSource {
+
+    private String name;
+    private SubscriptionManager subscriptionManager;
+
+    public EventSource(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public SubscriptionManager getSubscriptionManager() {
+        return subscriptionManager;
+    }
+
+    public void setSubscriptionManager(SubscriptionManager subscriptionManager) {
+        this.subscriptionManager = subscriptionManager;
+    }
+
+    public void buildService(AxisConfiguration axisCfg) throws AxisFault {
+        AxisService eventSourceService = new AxisService();
+        eventSourceService.setName(this.name);
+        AxisOperation mediateOperation = new InOutAxisOperation(SynapseConstants.SYNAPSE_OPERATION_NAME);
+        AxisOperation subscribeOperation = new InOutAxisOperation(new QName("subscribe"));
+        // Set the names of the two messages so that Axis2 is able to produce a WSDL (see SYNAPSE-366):
+//        mediateOperation.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE).setName("in");
+//        mediateOperation.getMessage(WSDLConstants.MESSAGE_LABEL_OUT_VALUE).setName("out");
+        EventingMessageReceiver receiver = new EventingMessageReceiver(this);
+        mediateOperation.setMessageReceiver(receiver);
+        subscribeOperation.setMessageReceiver(receiver);
+        subscribeOperation.setSoapAction(EventingConstants.SUBSCRIBE_ACTION);
+
+        eventSourceService.addOperation(mediateOperation);
+        eventSourceService.addOperation(subscribeOperation);
+        axisCfg.addService(eventSourceService);
+    }
+}

Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingConstants.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingConstants.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingConstants.java	Wed Nov 19 23:52:41 2008
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+/**
+ *
+ */
+public class EventingConstants {
+
+    public static final String WS_EVENTING_NS = "http://schemas.xmlsoap.org/ws/2004/08/eventing";
+    public static final String SUBSCRIBE_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe";
+    public static final String SUBSCRIBE_RESPONSE_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscribeResponse";
+    public static final String SUBSCRIPTION_RENEW_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/Renew";
+    public static final String SUBSCRIPTION_RENEW_RESPONSE_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/RenewResponse";
+    public static final String SUBSCRIPTION_GET_STATUS_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus";
+    public static final String SUBSCRIPTION_GET_STATUS_RESPONSE_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatusResponse";
+    public static final String UNSUBSCRIBE_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/Unsubscribe";
+    public static final String UNSUBSCRIBE_RESPONSE_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/UnsubscribeResponse";
+    public static final String SUBSCRIPTION_END_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscriptionEnd";
+
+    public static final String PUSH_DELIVERY_MODE = "http://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push";
+    public static final String TOPIC_FILTER_DIALECT = "http://synapse.apache.org/eventing/dialect/topicFilter";
+}

Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingMessageReceiver.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingMessageReceiver.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingMessageReceiver.java	Wed Nov 19 23:52:41 2008
@@ -0,0 +1,68 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.core.axis2.SynapseMessageReceiver;
+import org.apache.synapse.util.MessageHelper;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class EventingMessageReceiver extends SynapseMessageReceiver {
+
+    private EventSource eventSource;
+
+    public EventingMessageReceiver(EventSource eventSource) {
+        this.eventSource = eventSource;
+    }
+
+    public void receive(MessageContext mc) throws AxisFault {
+        SynapseConfiguration synCfg = (SynapseConfiguration) mc.getConfigurationContext()
+                .getAxisConfiguration().getParameter(SynapseConstants.SYNAPSE_CONFIG).getValue();
+        SynapseEnvironment synEnv = (SynapseEnvironment) mc.getConfigurationContext()
+                .getAxisConfiguration().getParameter(SynapseConstants.SYNAPSE_ENV).getValue();
+        org.apache.synapse.MessageContext smc = new Axis2MessageContext(mc, synCfg, synEnv);
+        if (EventingConstants.SUBSCRIBE_ACTION.equals(mc.getWSAAction())) {
+            Subscription subscription = SubscriptionFactory.createSubscription(smc);
+            eventSource.getSubscriptionManager().addSubscription(subscription);
+        } else {
+            List<Subscription> subscribers = eventSource.getSubscriptionManager().getMatchingSubscribers(smc);
+            for (Subscription subscription : subscribers) {
+                subscription.getEndpoint().send(MessageHelper.cloneMessageContext(smc));
+            }
+        }
+    }
+
+    public EventSource getEventSource() {
+        return eventSource;
+    }
+
+    public void setEventSource(EventSource eventSource) {
+        this.eventSource = eventSource;
+    }
+}

Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/Subscription.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/Subscription.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/Subscription.java	Wed Nov 19 23:52:41 2008
@@ -0,0 +1,87 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.util.UUIDGenerator;
+
+import java.util.Date;
+
+/**
+ *
+ */
+public class Subscription {
+
+    private EventFilter filter;
+    private Endpoint endpoint;
+	private String deliveryMode;
+	private String id = null;
+	private Date expires = null;
+
+    public Subscription() {
+        this.id = UUIDGenerator.getUUID();
+        this.deliveryMode = EventingConstants.PUSH_DELIVERY_MODE;
+    }
+
+    public Subscription(String deliveryMode) {
+        this.id = UUIDGenerator.getUUID();
+        this.deliveryMode = deliveryMode;
+    }
+
+    public EventFilter getFilter() {
+        return filter;
+    }
+
+    public void setFilter(EventFilter filter) {
+        this.filter = filter;
+    }
+
+    public Endpoint getEndpoint() {
+        return endpoint;
+    }
+
+    public void setEndpoint(Endpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public String getDeliveryMode() {
+        return deliveryMode;
+    }
+
+    public void setDeliveryMode(String deliveryMode) {
+        this.deliveryMode = deliveryMode;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public Date getExpires() {
+        return expires;
+    }
+
+    public void setExpires(Date expires) {
+        this.expires = expires;
+    }
+}

Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionFactory.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionFactory.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionFactory.java	Wed Nov 19 23:52:41 2008
@@ -0,0 +1,118 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.xml.SynapseXPathFactory;
+import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.endpoints.EndpointDefinition;
+import org.apache.synapse.eventing.filters.XPathBasedEventFilter;
+import org.apache.synapse.util.xpath.SynapseXPath;
+import org.jaxen.JaxenException;
+
+import javax.xml.namespace.QName;
+
+/**
+ *
+ */
+public class SubscriptionFactory {
+
+    private static final Log log = LogFactory.getLog(SubscriptionFactory.class);
+
+    private static final QName SUBSCRIBE_QNAME  = new QName(EventingConstants.WS_EVENTING_NS, "Subscribe");
+    private static final QName DELIVERY_QNAME   = new QName(EventingConstants.WS_EVENTING_NS, "Delivery");
+    private static final QName FILTER_QNAME     = new QName(EventingConstants.WS_EVENTING_NS, "Filter");
+    private static final QName NOTIFY_TO_QNAME  = new QName(EventingConstants.WS_EVENTING_NS, "NotifyTo");
+    private static final QName ATT_DIALECT      = new QName(XMLConfigConstants.NULL_NAMESPACE, "Dialect");
+    private static final QName ATT_XPATH        = new QName(XMLConfigConstants.NULL_NAMESPACE, "XPath");
+
+    public static Subscription createSubscription(MessageContext mc) {
+        Subscription subscription = null;
+        OMElement elem = mc.getEnvelope().getBody().getFirstChildWithName(SUBSCRIBE_QNAME);
+        if (elem != null) {
+
+            OMElement deliveryElem = elem.getFirstChildWithName(DELIVERY_QNAME);
+            if (deliveryElem != null) {
+                OMElement notifyToElem = deliveryElem.getFirstChildWithName(NOTIFY_TO_QNAME);
+                if (notifyToElem != null) {
+                    Endpoint ep = getEndpointFromWSAAddress(notifyToElem.getFirstElement());
+                    if (ep != null) {
+                        subscription = new Subscription(EventingConstants.PUSH_DELIVERY_MODE);
+                        subscription.setEndpoint(ep);
+                    }
+                } else {
+                    handleException("NotifyTo element not found in the subscription message");
+                }
+            } else {
+                handleException("Delivery element is not found in the subscription message");
+            }
+
+            OMElement filterElem = elem.getFirstChildWithName(FILTER_QNAME);
+            if (subscription != null && filterElem != null) {
+                OMAttribute dialectAttr = filterElem.getAttribute(ATT_DIALECT);
+                if (dialectAttr != null && dialectAttr.getAttributeValue() != null) {
+                    if (EventingConstants.TOPIC_FILTER_DIALECT.equals(dialectAttr.getAttributeValue())) {
+                        XPathBasedEventFilter filter = new XPathBasedEventFilter();
+                        filter.setResultValue(filterElem.getText());
+                        if (filterElem.getAttribute(ATT_XPATH) != null) {
+                            try {
+                                SynapseXPath xpath = SynapseXPathFactory.getSynapseXPath(filterElem, ATT_XPATH);
+                                filter.setSourceXpath(xpath);
+                            } catch (JaxenException e) {
+                                handleException("Unable to create the EventFilter xpath", e);
+                            }
+                        }
+                        subscription.setFilter(filter);
+                    }
+                } else {
+                    handleException("Error in creating subscription. Filter dialect not defined");
+                }
+            }
+        } else {
+            handleException("Subscribe element is required as the payload of the subscription message");
+        }
+        return subscription;
+    }
+
+    private static Endpoint getEndpointFromWSAAddress(OMElement address) {
+        AddressEndpoint endpoint = new AddressEndpoint();
+        EndpointDefinition def = new EndpointDefinition();
+        def.setAddress(address.getText().trim());
+        endpoint.setDefinition(def);
+        return endpoint;
+    }
+
+    private static void handleException(String message) {
+        log.error(message);
+        throw new SynapseException(message);
+    }
+
+    private static void handleException(String message, Exception e) {
+        log.error(message, e);
+        throw new SynapseException(message, e);
+    }
+}

Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java	Wed Nov 19 23:52:41 2008
@@ -0,0 +1,56 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+import org.apache.synapse.MessageContext;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class SubscriptionManager {
+
+    private Map<String, String> properties = new HashMap<String, String>();
+
+    public abstract List<Subscription> getSubscribers();
+
+    public abstract List<Subscription> getMatchingSubscribers(MessageContext mc);
+
+    public abstract Subscription getSubscription(String id);
+
+    public abstract String addSubscription(Subscription subs);
+
+    public abstract boolean deleteSubscription(String id);
+
+    public abstract void init();
+
+    public void addProperty(String name, String value) {
+        properties.put(name, value);
+    }
+
+    public Collection<String> getPropertyNames() {
+        return properties.keySet();
+    }
+
+    public String getPropertyValue(String name) {
+        return properties.get(name);
+    }
+}

Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/filters/XPathBasedEventFilter.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/filters/XPathBasedEventFilter.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/filters/XPathBasedEventFilter.java	Wed Nov 19 23:52:41 2008
@@ -0,0 +1,54 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.eventing.filters;
+
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.eventing.EventFilter;
+import org.apache.synapse.util.xpath.SynapseXPath;
+
+/**
+ *
+ */
+public class XPathBasedEventFilter implements EventFilter {
+
+    private SynapseXPath sourceXpath;
+    private String resultValue;
+
+    public boolean isSatisfied(MessageContext mc) {
+        String evaluatedValue = sourceXpath.stringValueOf(mc);
+        return evaluatedValue.equals(resultValue);
+    }
+
+    public SynapseXPath getSourceXpath() {
+        return sourceXpath;
+    }
+
+    public void setSourceXpath(SynapseXPath sourceXpath) {
+        this.sourceXpath = sourceXpath;
+    }
+
+    public String getResultValue() {
+        return resultValue;
+    }
+
+    public void setResultValue(String resultValue) {
+        this.resultValue = resultValue;
+    }
+}

Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java	Wed Nov 19 23:52:41 2008
@@ -0,0 +1,82 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.eventing.managers;
+
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.eventing.EventFilter;
+import org.apache.synapse.eventing.Subscription;
+import org.apache.synapse.eventing.SubscriptionManager;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ */
+public class DefaultInMemorySubscriptionManager extends SubscriptionManager {
+
+    Map<String, Subscription> store = new ConcurrentHashMap<String, Subscription>();
+
+    public String addSubscription(Subscription subs) {
+		if (subs.getId() == null) {
+			subs.setId(org.apache.axiom.om.util.UUIDGenerator.getUUID());
+		}
+		store.put(subs.getId(), subs);
+		return subs.getId();
+	}
+
+	public boolean deleteSubscription(String id) {
+		if (store.containsKey(id)) {
+			store.remove(id);
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+    public List<Subscription> getSubscribers() {
+		LinkedList<Subscription> list = new LinkedList<Subscription>();
+        for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) {
+            list.add(stringSubscriptionEntry.getValue());
+        }
+		return list;
+	}
+
+    public List<Subscription> getMatchingSubscribers(MessageContext mc) {
+		LinkedList<Subscription> list = new LinkedList<Subscription>();
+        for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) {
+            EventFilter filter = stringSubscriptionEntry.getValue().getFilter();
+            if (filter == null || filter.isSatisfied(mc)) {
+                list.add(stringSubscriptionEntry.getValue());
+            }
+        }
+		return list;
+	}
+
+    public Subscription getSubscription(String id) {
+		return store.get(id);
+	}
+
+    public void init() {
+        // nothing to do at the inmemory subscriber initialization
+    }
+}

Modified: branches/synapse/1.2.wso2v1/repository/conf/synapse.xml
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/repository/conf/synapse.xml?rev=24296&r1=24295&r2=24296&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/repository/conf/synapse.xml	(original)
+++ branches/synapse/1.2.wso2v1/repository/conf/synapse.xml	Wed Nov 19 23:52:41 2008
@@ -19,18 +19,14 @@
 <!-- A simple Synapse configuration -->
 <definitions xmlns="http://ws.apache.org/ns/synapse">
 
-    <in>
-        <!-- Log all messages passing through -->
-        <log level="full"/>
-        
-        <!-- ensure that the default configuration only sends if it is one of samples -->
-        <!-- Otherwise Synapse would be an open proxy by default (BAD!)               -->
-        <filter source="get-property('To')" regex="http://localhost:9000.*">
-            <!-- Send the messages where they have been sent (i.e. implicit "To" EPR) -->
-            <send/>
-        </filter>
-    </in>
-    <out>
-        <send/>
-    </out>
-</definitions>
\ No newline at end of file
+ <eventSource name="eventing">
+   <subscriptionManager class="org.apache.synapse.eventing.managers.RegistryBasedSubscriptionManager">
+   <property name="registryURL" value="http://localhost:8180/wso2registry"/>
+   <property name="username" value="admin"/>
+   <property name="password" value="admin"/>
+   <property name="topicHeaderName" value="Topic"/>
+   <property name="topicHeaderNS" value="http://wso2.com/aip"/>
+   </subscriptionManager>
+ </eventSource>
+ <log level="full"/>
+</definitions> 



More information about the Esb-java-dev mailing list