[esb-java-dev] svn commit r24296 - in branches/synapse/1.2.wso2v1: modules/core/src/main/java/org/apache/synapse/config modules/core/src/main/java/org/apache/synapse/config/xml modules/core/src/main/java/org/apache/synapse/config/xml/eventing modules/core/src/main/java/org/apache/synapse/core/axis2 modules/core/src/main/java/org/apache/synapse/eventing modules/core/src/main/java/org/apache/synapse/eventing/filters modules/core/src/main/java/org/apache/synapse/eventing/managers repository/conf
asankaa at wso2.com
asankaa at wso2.com
Wed Nov 19 23:52:41 PST 2008
Author: asankaa
Date: Wed Nov 19 23:52:41 2008
New Revision: 24296
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=24296
Log:
initial impl of eventing
Added:
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/PropertyHelper.java (contents, props changed)
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java (contents, props changed)
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java (contents, props changed)
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventFilter.java (contents, props changed)
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java (contents, props changed)
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingConstants.java (contents, props changed)
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingMessageReceiver.java (contents, props changed)
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/Subscription.java (contents, props changed)
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionFactory.java (contents, props changed)
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java (contents, props changed)
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/filters/
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/filters/XPathBasedEventFilter.java (contents, props changed)
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java (contents, props changed)
Modified:
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseInitializationModule.java
branches/synapse/1.2.wso2v1/repository/conf/synapse.xml
Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java?rev=24296&r1=24295&r2=24296&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java (original)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java Wed Nov 19 23:52:41 2008
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.*;
+import org.apache.synapse.eventing.EventSource;
import org.apache.synapse.commons.util.RMIRegistryController;
import org.apache.synapse.commons.util.datasource.InMemoryDataSourceRepository;
import org.apache.synapse.commons.util.datasource.JNDIBasedDataSourceRepository;
@@ -110,6 +111,11 @@
private String pathToConfigFile = null;
/**
+ * Holds Event Sources defined through Synapse
+ */
+ private Map<String, EventSource> eventSources = new HashMap<String, EventSource>();
+
+ /**
* Add a named sequence into the local registry
*
* @param key
@@ -917,4 +923,23 @@
public TaskDescriptionRepository getTaskDescriptionRepository() {
return repository;
}
+ public void addEventSource(String name, EventSource eventSource) {
+ eventSources.put(name, eventSource);
+ }
+
+ public EventSource getEventSource(String name) {
+ return eventSources.get(name);
+ }
+
+ public void removeEventSource(String name) {
+ eventSources.remove(name);
+ }
+
+ public Collection<EventSource> getEventSources() {
+ return eventSources.values();
+ }
+
+ public void setEventSources(Map<String, EventSource> eventSources) {
+ this.eventSources = eventSources;
+ }
}
Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/PropertyHelper.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/PropertyHelper.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/PropertyHelper.java Wed Nov 19 23:52:41 2008
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+
+import javax.xml.namespace.QName;
+import java.lang.reflect.Method;
+
+/**
+ * This class will be used as a Helper class to get the properties loaded while building the
+ * Synapse Configuration from the XML
+ */
+public class PropertyHelper {
+
+ /**
+ * Log variable for the logging purposes
+ */
+ private static final Log log = LogFactory.getLog(PropertyHelper.class);
+
+ /**
+ * Find and invoke the setter method with the name of form setXXX passing in the value given
+ * on the POJO object
+ * @param name name of the setter field
+ * @param val value to be set
+ * @param obj POJO instance
+ */
+ public static void setInstanceProperty(String name, Object val, Object obj) {
+
+ String mName = "set" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
+ Method method = null;
+
+ try {
+ Method[] methods = obj.getClass().getMethods();
+ boolean invoked = false;
+
+ for (int i=0; i<methods.length; i++) {
+ if (mName.equals(methods[i].getName())) {
+ Class[] params = methods[i].getParameterTypes();
+ if (params.length != 1) {
+ handleException("Did not find a setter method named : " + mName +
+ "() that takes a single String, int, long, float, double " +
+ "or boolean parameter");
+ } else if (val instanceof String) {
+ String value = (String) val;
+ if (params[0].equals(String.class)) {
+ method = obj.getClass().getMethod(mName, new Class[]{String.class});
+ method.invoke(obj, new String[]{value});
+ } else if (params[0].equals(int.class)) {
+ method = obj.getClass().getMethod(mName, new Class[]{int.class});
+ method.invoke(obj, new Integer[]{new Integer(value)});
+ } else if (params[0].equals(long.class)) {
+ method = obj.getClass().getMethod(mName, new Class[]{long.class});
+ method.invoke(obj, new Long[]{new Long(value)});
+ } else if (params[0].equals(float.class)) {
+ method = obj.getClass().getMethod(mName, new Class[]{float.class});
+ method.invoke(obj, new Float[]{new Float(value)});
+ } else if (params[0].equals(double.class)) {
+ method = obj.getClass().getMethod(mName, new Class[]{double.class});
+ method.invoke(obj, new Double[]{new Double(value)});
+ } else if (params[0].equals(boolean.class)) {
+ method = obj.getClass().getMethod(mName, new Class[]{boolean.class});
+ method.invoke(obj, new Boolean[]{new Boolean(value)});
+ } else {
+ handleException("Did not find a setter method named : " + mName +
+ "() that takes a single String, int, long, float, double " +
+ "or boolean parameter");
+ }
+ } else {
+ if (params[0].equals(OMElement.class)) {
+ method = obj.getClass().getMethod(mName, new Class[]{OMElement.class});
+ method.invoke(obj, new OMElement[]{(OMElement) val});
+ }
+ }
+ invoked = true;
+ }
+ }
+
+ if (!invoked) {
+ handleException("Did not find a setter method named : " + mName +
+ "() that takes a single String, int, long, float, double " +
+ "or boolean parameter");
+ }
+
+ } catch (Exception e) {
+ handleException("Error invoking setter method named : " + mName +
+ "() that takes a single String, int, long, float, double " +
+ "or boolean parameter", e);
+ }
+ }
+
+ /**
+ * This method will set the static property discribed in the OMElement to the specified object.
+ * This Object should have the setter method for the specified property name
+ *
+ * @param property - OMElement specifying the property to be built in to the object
+ * @param o - Object to which the specified property will be set.
+ */
+ public static void setStaticProperty(OMElement property, Object o) {
+
+ if (property.getLocalName().toLowerCase().equals("property")) {
+
+ String propertyName = property.getAttributeValue(new QName("name"));
+ String mName = "set"
+ + Character.toUpperCase(propertyName.charAt(0))
+ + propertyName.substring(1);
+
+ // try to set String value first
+ if (property.getAttributeValue(new QName("value")) != null) {
+ String value = property.getAttributeValue(new QName("value"));
+
+ try {
+ Method method = o.getClass().getMethod(mName, new Class[]{String.class});
+ if (log.isDebugEnabled()) {
+ log.debug("Setting property :: invoking method "
+ + mName + "(" + value + ")");
+ }
+ method.invoke(o, new Object[]{value});
+
+ } catch (Exception e) {
+ handleException("Error setting property : " + propertyName
+ + " as a String property into class mediator : " + o.getClass() + " : "
+ + e.getMessage(), e);
+ }
+
+ } else {
+ // now try XML child
+ OMElement value = property.getFirstElement();
+ if (value != null) {
+
+ try {
+ Method method = o.getClass().getMethod(mName, new Class[]{OMElement.class});
+ if (log.isDebugEnabled()) {
+ log.debug("Setting property :: invoking method "
+ + mName + "(" + value + ")");
+ }
+ method.invoke(o, new Object[]{value});
+
+ } catch (Exception e) {
+ handleException("Error setting property : " + propertyName
+ + " as an OMElement property into class mediator : "
+ + o.getClass() + " : " + e.getMessage(), e);
+ }
+
+ }
+
+ }
+ }
+ }
+
+ /**
+ * This method will check the given OMElement represent either a static property or not
+ *
+ * @param property - OMElement to be checked for the static property
+ * @return boolean true if the elemet represents a static property element false otherwise
+ */
+ public static boolean isStaticProperty(OMElement property) {
+ return "property".equals(property.getLocalName().toLowerCase())
+ && (property.getAttributeValue(new QName("expression")) == null);
+ }
+
+ private static void handleException(String message, Throwable e) {
+ log.error(message + e.getMessage());
+ throw new SynapseException(message, e);
+ }
+
+ private static void handleException(String message) {
+ log.error(message);
+ throw new SynapseException(message);
+ }
+}
Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java?rev=24296&r1=24295&r2=24296&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java (original)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java Wed Nov 19 23:52:41 2008
@@ -27,10 +27,12 @@
import org.apache.synapse.Startup;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
+import org.apache.synapse.eventing.EventSource;
import org.apache.synapse.config.Entry;
import org.apache.synapse.config.SynapseConfigUtils;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.config.xml.endpoints.EndpointFactory;
+import org.apache.synapse.config.xml.eventing.EventSourceFactory;
import org.apache.synapse.core.axis2.ProxyService;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.mediators.MediatorProperty;
@@ -85,6 +87,8 @@
defineProxy(config, elt);
} else if (XMLConfigConstants.REGISTRY_ELT.equals(elt.getQName())) {
defineRegistry(config, elt);
+ } else if (XMLConfigConstants.EVENT_SOURCE_ELT.equals(elt.getQName())) {
+ defineEventSource(config, elt);
} else if (XMLConfigConstants.TASK_ELT.equals(elt.getQName())) {
defineStartup(config, elt);
} else {
@@ -153,6 +157,13 @@
config.addProxyService(proxy.getName(), proxy);
}
+ private static void defineEventSource(SynapseConfiguration config, OMElement elem) {
+ EventSource eventSource = EventSourceFactory.createEventSource(elem);
+ if (config.getEventSource(eventSource.getName()) != null) {
+ handleException("Duplicate proxy service with name : " + eventSource.getName());
+ }
+ config.addEventSource(eventSource.getName(), eventSource);
+ }
private static void defineEntry(SynapseConfiguration config, OMElement elem) {
Entry entry = EntryFactory.createEntry(elem);
if (config.getLocalRegistry().get(entry.getKey()) != null) {
Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java?rev=24296&r1=24295&r2=24296&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java (original)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java Wed Nov 19 23:52:41 2008
@@ -19,9 +19,11 @@
import org.apache.synapse.config.Entry;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.config.xml.endpoints.EndpointSerializer;
+import org.apache.synapse.config.xml.eventing.EventSourceSerializer;
import org.apache.synapse.core.axis2.ProxyService;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.Startup;
+import org.apache.synapse.eventing.EventSource;
public class SynapseXMLConfigurationSerializer implements ConfigurationSerializer {
@@ -60,6 +62,10 @@
ProxyServiceSerializer.serializeProxy(definitions, service);
}
+ for (EventSource eventSource : synCfg.getEventSources()) {
+ EventSourceSerializer.serializeEventSource(definitions, eventSource);
+ }
+
Map entries = new HashMap();
Map<String, Endpoint> endpoints = new HashMap<String, Endpoint>();
Map sequences = new HashMap();
Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java?rev=24296&r1=24295&r2=24296&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java (original)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java Wed Nov 19 23:52:41 2008
@@ -20,6 +20,8 @@
package org.apache.synapse.config.xml;
import org.apache.synapse.SynapseConstants;
+import org.apache.axiom.om.OMNamespace;
+
import javax.xml.namespace.QName;
/**
@@ -29,6 +31,7 @@
// re-definition of the Synapse NS here to make things easier for the XML config lang code
public static final String SYNAPSE_NAMESPACE = SynapseConstants.SYNAPSE_NAMESPACE;
+ public static final OMNamespace SYNAPSE_OMNAMESPACE = SynapseConstants.SYNAPSE_OMNAMESPACE;
//- Mediators -
//-- PropertyMediator --
@@ -55,6 +58,7 @@
public static final QName REGISTRY_ELT = new QName(SYNAPSE_NAMESPACE, "registry");
public static final QName TASK_ELT = new QName(SYNAPSE_NAMESPACE, "task");
public static final QName PROXY_ELT = new QName(SYNAPSE_NAMESPACE, "proxy");
+ public static final QName EVENT_SOURCE_ELT = new QName(SYNAPSE_NAMESPACE, "eventSource");
public static final String NULL_NAMESPACE = "";
public static final Object QUARTZ_QNAME =
new QName("http://www.opensymphony.com/quartz/JobSchedulingData", "quartz");
Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java Wed Nov 19 23:52:41 2008
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.config.xml.eventing;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.xml.PropertyHelper;
+import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.eventing.EventSource;
+import org.apache.synapse.eventing.SubscriptionManager;
+
+import javax.xml.namespace.QName;
+import java.util.Iterator;
+
+/**
+ * <eventSource name="blah">
+ * <subscriptionManager class="org.apache.synapse.events.DefaultInMemorySubscriptionManager">
+ * <property name="other" value="some text property"/>
+ * </subscriptionManager>
+ * <subscription id="static1">
+ * <filter....>
+ * <sequence...>
+ * <endpoint..>
+ * </subscription>*
+ * <eventSource>
+ */
+public class EventSourceFactory {
+
+ private static final Log log = LogFactory.getLog(EventSourceFactory.class);
+ private static final QName SUBSCRIPTION_MANAGER_QNAME
+ = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "subscriptionManager");
+ private static final QName PROPERTIES_QNAME = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "property");
+ private static final QName WS_EVENTING_QNAME
+ = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "wsEventing");
+
+ public static EventSource createEventSource(OMElement elem) {
+
+ EventSource eventSource = null;
+
+ OMAttribute name = elem.getAttribute(new QName(XMLConfigConstants.NULL_NAMESPACE, "name"));
+ if (name == null) {
+ handleException("The 'name' attribute is required for a event source de");
+ } else {
+ eventSource = new EventSource(name.getAttributeValue());
+ }
+
+ OMElement subscriptionManagerElem = elem.getFirstChildWithName(SUBSCRIPTION_MANAGER_QNAME);
+ if (eventSource != null && subscriptionManagerElem != null) {
+
+ OMAttribute clazz = subscriptionManagerElem.getAttribute(new QName(XMLConfigConstants.NULL_NAMESPACE, "class"));
+ if (clazz != null) {
+ String className = clazz.getAttributeValue();
+ try {
+ Class subscriptionManagerClass = Class.forName(className);
+ SubscriptionManager manager = (SubscriptionManager) subscriptionManagerClass.newInstance();
+ Iterator itr = subscriptionManagerElem.getChildrenWithName(PROPERTIES_QNAME);
+ while (itr.hasNext()) {
+ OMElement propElem = (OMElement) itr.next();
+ String propName = propElem.getAttribute(new QName("name")).getAttributeValue();
+ String propValue = propElem.getAttribute(new QName("value")).getAttributeValue();
+ manager.addProperty(propName, propValue);
+ PropertyHelper.setStaticProperty(propElem, manager);
+ }
+ eventSource.setSubscriptionManager(manager);
+ } catch (ClassNotFoundException e) {
+ handleException("SubscriptionManager class not found", e);
+ } catch (IllegalAccessException e) {
+ handleException("Unable to access the SubscriptionManager object", e);
+ } catch (InstantiationException e) {
+ handleException("Unable to instantiate the SubscriptionManager object", e);
+ }
+ } else {
+ handleException("Subscription manager class is a required attribute");
+ }
+ } else {
+ handleException("Subscription Manager has not been specified for the event source");
+ }
+
+ return eventSource;
+ }
+
+ private static void handleException(String msg) {
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+ private static void handleException(String msg, Exception e) {
+ log.error(msg, e);
+ throw new SynapseException(msg, e);
+ }
+}
Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java Wed Nov 19 23:52:41 2008
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.config.xml.eventing;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.eventing.EventSource;
+
+/**
+ * <eventSource name="blah">
+ * <subscriptionManager class="org.apache.synapse.events.DefaultInMemorySubscriptionManager">
+ * <property name="other" value="some text property"/>
+ * </subscriptionManager>
+ * <subscription id="static1">
+ * <filter....>
+ * <sequence...>
+ * <endpoint..>
+ * </subscription>*
+ * <eventSource>
+ */
+public class EventSourceSerializer {
+
+ public static OMElement serializeEventSource(OMElement elem, EventSource eventSource) {
+
+ OMFactory fac = OMAbstractFactory.getOMFactory();
+ OMNamespace nullNS = fac.createOMNamespace(XMLConfigConstants.NULL_NAMESPACE, "");
+
+ OMElement evenSourceElem = fac.createOMElement("eventSource", XMLConfigConstants.SYNAPSE_OMNAMESPACE);
+ if (eventSource.getName() != null) {
+ evenSourceElem.addAttribute(fac.createOMAttribute("name", nullNS, eventSource.getName()));
+ }
+
+ if (eventSource.getSubscriptionManager() != null) {
+ OMElement subManagerElem = fac.createOMElement("subscriptionManager", XMLConfigConstants.SYNAPSE_OMNAMESPACE);
+ subManagerElem.addAttribute(fac.createOMAttribute("class", nullNS, eventSource.getSubscriptionManager().getClass().getName()));
+ for (String name : eventSource.getSubscriptionManager().getPropertyNames()) {
+ OMElement propElem = fac.createOMElement("property", XMLConfigConstants.SYNAPSE_OMNAMESPACE);
+ propElem.addAttribute(fac.createOMAttribute("name", nullNS, name));
+ propElem.addAttribute(fac.createOMAttribute(
+ "value", nullNS, eventSource.getSubscriptionManager().getPropertyValue(name)));
+ subManagerElem.addChild(propElem);
+ }
+ evenSourceElem.addChild(subManagerElem);
+ }
+
+ if (elem != null) {
+ elem.addChild(evenSourceElem);
+ }
+
+ return evenSourceElem;
+ }
+}
\ No newline at end of file
Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseInitializationModule.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseInitializationModule.java?rev=24296&r1=24295&r2=24296&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseInitializationModule.java (original)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseInitializationModule.java Wed Nov 19 23:52:41 2008
@@ -33,6 +33,7 @@
import org.apache.synapse.ServerManager;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
+import org.apache.synapse.eventing.EventSource;
import org.apache.synapse.commons.util.datasource.DataSourceInformationRepositoryHelper;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.config.SynapseConfigurationBuilder;
@@ -150,6 +151,10 @@
}
}
+ for (EventSource eventSource : synCfg.getEventSources()) {
+ eventSource.buildService(axisCfg);
+ eventSource.getSubscriptionManager().init();
+ }
synCfg.init(configurationContext);
log.info("Synapse initialized successfully...!");
Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventFilter.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventFilter.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventFilter.java Wed Nov 19 23:52:41 2008
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+import org.apache.synapse.MessageContext;
+
+/**
+ *
+ */
+public interface EventFilter {
+
+ public boolean isSatisfied(MessageContext mc);
+}
Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java Wed Nov 19 23:52:41 2008
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.InOutAxisOperation;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.synapse.SynapseConstants;
+
+import javax.xml.namespace.QName;
+
+/**
+ *
+ */
+public class EventSource {
+
+ private String name;
+ private SubscriptionManager subscriptionManager;
+
+ public EventSource(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public SubscriptionManager getSubscriptionManager() {
+ return subscriptionManager;
+ }
+
+ public void setSubscriptionManager(SubscriptionManager subscriptionManager) {
+ this.subscriptionManager = subscriptionManager;
+ }
+
+ public void buildService(AxisConfiguration axisCfg) throws AxisFault {
+ AxisService eventSourceService = new AxisService();
+ eventSourceService.setName(this.name);
+ AxisOperation mediateOperation = new InOutAxisOperation(SynapseConstants.SYNAPSE_OPERATION_NAME);
+ AxisOperation subscribeOperation = new InOutAxisOperation(new QName("subscribe"));
+ // Set the names of the two messages so that Axis2 is able to produce a WSDL (see SYNAPSE-366):
+// mediateOperation.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE).setName("in");
+// mediateOperation.getMessage(WSDLConstants.MESSAGE_LABEL_OUT_VALUE).setName("out");
+ EventingMessageReceiver receiver = new EventingMessageReceiver(this);
+ mediateOperation.setMessageReceiver(receiver);
+ subscribeOperation.setMessageReceiver(receiver);
+ subscribeOperation.setSoapAction(EventingConstants.SUBSCRIBE_ACTION);
+
+ eventSourceService.addOperation(mediateOperation);
+ eventSourceService.addOperation(subscribeOperation);
+ axisCfg.addService(eventSourceService);
+ }
+}
Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingConstants.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingConstants.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingConstants.java Wed Nov 19 23:52:41 2008
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+/**
+ *
+ */
+public class EventingConstants {
+
+ public static final String WS_EVENTING_NS = "http://schemas.xmlsoap.org/ws/2004/08/eventing";
+ public static final String SUBSCRIBE_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe";
+ public static final String SUBSCRIBE_RESPONSE_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscribeResponse";
+ public static final String SUBSCRIPTION_RENEW_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/Renew";
+ public static final String SUBSCRIPTION_RENEW_RESPONSE_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/RenewResponse";
+ public static final String SUBSCRIPTION_GET_STATUS_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus";
+ public static final String SUBSCRIPTION_GET_STATUS_RESPONSE_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatusResponse";
+ public static final String UNSUBSCRIBE_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/Unsubscribe";
+ public static final String UNSUBSCRIBE_RESPONSE_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/UnsubscribeResponse";
+ public static final String SUBSCRIPTION_END_ACTION = "http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscriptionEnd";
+
+ public static final String PUSH_DELIVERY_MODE = "http://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push";
+ public static final String TOPIC_FILTER_DIALECT = "http://synapse.apache.org/eventing/dialect/topicFilter";
+}
Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingMessageReceiver.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingMessageReceiver.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingMessageReceiver.java Wed Nov 19 23:52:41 2008
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.core.axis2.SynapseMessageReceiver;
+import org.apache.synapse.util.MessageHelper;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class EventingMessageReceiver extends SynapseMessageReceiver {
+
+ private EventSource eventSource;
+
+ public EventingMessageReceiver(EventSource eventSource) {
+ this.eventSource = eventSource;
+ }
+
+ public void receive(MessageContext mc) throws AxisFault {
+ SynapseConfiguration synCfg = (SynapseConfiguration) mc.getConfigurationContext()
+ .getAxisConfiguration().getParameter(SynapseConstants.SYNAPSE_CONFIG).getValue();
+ SynapseEnvironment synEnv = (SynapseEnvironment) mc.getConfigurationContext()
+ .getAxisConfiguration().getParameter(SynapseConstants.SYNAPSE_ENV).getValue();
+ org.apache.synapse.MessageContext smc = new Axis2MessageContext(mc, synCfg, synEnv);
+ if (EventingConstants.SUBSCRIBE_ACTION.equals(mc.getWSAAction())) {
+ Subscription subscription = SubscriptionFactory.createSubscription(smc);
+ eventSource.getSubscriptionManager().addSubscription(subscription);
+ } else {
+ List<Subscription> subscribers = eventSource.getSubscriptionManager().getMatchingSubscribers(smc);
+ for (Subscription subscription : subscribers) {
+ subscription.getEndpoint().send(MessageHelper.cloneMessageContext(smc));
+ }
+ }
+ }
+
+ public EventSource getEventSource() {
+ return eventSource;
+ }
+
+ public void setEventSource(EventSource eventSource) {
+ this.eventSource = eventSource;
+ }
+}
Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/Subscription.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/Subscription.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/Subscription.java Wed Nov 19 23:52:41 2008
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.util.UUIDGenerator;
+
+import java.util.Date;
+
+/**
+ *
+ */
+public class Subscription {
+
+ private EventFilter filter;
+ private Endpoint endpoint;
+ private String deliveryMode;
+ private String id = null;
+ private Date expires = null;
+
+ public Subscription() {
+ this.id = UUIDGenerator.getUUID();
+ this.deliveryMode = EventingConstants.PUSH_DELIVERY_MODE;
+ }
+
+ public Subscription(String deliveryMode) {
+ this.id = UUIDGenerator.getUUID();
+ this.deliveryMode = deliveryMode;
+ }
+
+ public EventFilter getFilter() {
+ return filter;
+ }
+
+ public void setFilter(EventFilter filter) {
+ this.filter = filter;
+ }
+
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+ public void setEndpoint(Endpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public String getDeliveryMode() {
+ return deliveryMode;
+ }
+
+ public void setDeliveryMode(String deliveryMode) {
+ this.deliveryMode = deliveryMode;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Date getExpires() {
+ return expires;
+ }
+
+ public void setExpires(Date expires) {
+ this.expires = expires;
+ }
+}
Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionFactory.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionFactory.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionFactory.java Wed Nov 19 23:52:41 2008
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.xml.SynapseXPathFactory;
+import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.endpoints.EndpointDefinition;
+import org.apache.synapse.eventing.filters.XPathBasedEventFilter;
+import org.apache.synapse.util.xpath.SynapseXPath;
+import org.jaxen.JaxenException;
+
+import javax.xml.namespace.QName;
+
+/**
+ *
+ */
+public class SubscriptionFactory {
+
+ private static final Log log = LogFactory.getLog(SubscriptionFactory.class);
+
+ private static final QName SUBSCRIBE_QNAME = new QName(EventingConstants.WS_EVENTING_NS, "Subscribe");
+ private static final QName DELIVERY_QNAME = new QName(EventingConstants.WS_EVENTING_NS, "Delivery");
+ private static final QName FILTER_QNAME = new QName(EventingConstants.WS_EVENTING_NS, "Filter");
+ private static final QName NOTIFY_TO_QNAME = new QName(EventingConstants.WS_EVENTING_NS, "NotifyTo");
+ private static final QName ATT_DIALECT = new QName(XMLConfigConstants.NULL_NAMESPACE, "Dialect");
+ private static final QName ATT_XPATH = new QName(XMLConfigConstants.NULL_NAMESPACE, "XPath");
+
+ public static Subscription createSubscription(MessageContext mc) {
+ Subscription subscription = null;
+ OMElement elem = mc.getEnvelope().getBody().getFirstChildWithName(SUBSCRIBE_QNAME);
+ if (elem != null) {
+
+ OMElement deliveryElem = elem.getFirstChildWithName(DELIVERY_QNAME);
+ if (deliveryElem != null) {
+ OMElement notifyToElem = deliveryElem.getFirstChildWithName(NOTIFY_TO_QNAME);
+ if (notifyToElem != null) {
+ Endpoint ep = getEndpointFromWSAAddress(notifyToElem.getFirstElement());
+ if (ep != null) {
+ subscription = new Subscription(EventingConstants.PUSH_DELIVERY_MODE);
+ subscription.setEndpoint(ep);
+ }
+ } else {
+ handleException("NotifyTo element not found in the subscription message");
+ }
+ } else {
+ handleException("Delivery element is not found in the subscription message");
+ }
+
+ OMElement filterElem = elem.getFirstChildWithName(FILTER_QNAME);
+ if (subscription != null && filterElem != null) {
+ OMAttribute dialectAttr = filterElem.getAttribute(ATT_DIALECT);
+ if (dialectAttr != null && dialectAttr.getAttributeValue() != null) {
+ if (EventingConstants.TOPIC_FILTER_DIALECT.equals(dialectAttr.getAttributeValue())) {
+ XPathBasedEventFilter filter = new XPathBasedEventFilter();
+ filter.setResultValue(filterElem.getText());
+ if (filterElem.getAttribute(ATT_XPATH) != null) {
+ try {
+ SynapseXPath xpath = SynapseXPathFactory.getSynapseXPath(filterElem, ATT_XPATH);
+ filter.setSourceXpath(xpath);
+ } catch (JaxenException e) {
+ handleException("Unable to create the EventFilter xpath", e);
+ }
+ }
+ subscription.setFilter(filter);
+ }
+ } else {
+ handleException("Error in creating subscription. Filter dialect not defined");
+ }
+ }
+ } else {
+ handleException("Subscribe element is required as the payload of the subscription message");
+ }
+ return subscription;
+ }
+
+ private static Endpoint getEndpointFromWSAAddress(OMElement address) {
+ AddressEndpoint endpoint = new AddressEndpoint();
+ EndpointDefinition def = new EndpointDefinition();
+ def.setAddress(address.getText().trim());
+ endpoint.setDefinition(def);
+ return endpoint;
+ }
+
+ private static void handleException(String message) {
+ log.error(message);
+ throw new SynapseException(message);
+ }
+
+ private static void handleException(String message, Exception e) {
+ log.error(message, e);
+ throw new SynapseException(message, e);
+ }
+}
Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/SubscriptionManager.java Wed Nov 19 23:52:41 2008
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.eventing;
+
+import org.apache.synapse.MessageContext;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class SubscriptionManager {
+
+ private Map<String, String> properties = new HashMap<String, String>();
+
+ public abstract List<Subscription> getSubscribers();
+
+ public abstract List<Subscription> getMatchingSubscribers(MessageContext mc);
+
+ public abstract Subscription getSubscription(String id);
+
+ public abstract String addSubscription(Subscription subs);
+
+ public abstract boolean deleteSubscription(String id);
+
+ public abstract void init();
+
+ public void addProperty(String name, String value) {
+ properties.put(name, value);
+ }
+
+ public Collection<String> getPropertyNames() {
+ return properties.keySet();
+ }
+
+ public String getPropertyValue(String name) {
+ return properties.get(name);
+ }
+}
Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/filters/XPathBasedEventFilter.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/filters/XPathBasedEventFilter.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/filters/XPathBasedEventFilter.java Wed Nov 19 23:52:41 2008
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.eventing.filters;
+
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.eventing.EventFilter;
+import org.apache.synapse.util.xpath.SynapseXPath;
+
+/**
+ *
+ */
+public class XPathBasedEventFilter implements EventFilter {
+
+ private SynapseXPath sourceXpath;
+ private String resultValue;
+
+ public boolean isSatisfied(MessageContext mc) {
+ String evaluatedValue = sourceXpath.stringValueOf(mc);
+ return evaluatedValue.equals(resultValue);
+ }
+
+ public SynapseXPath getSourceXpath() {
+ return sourceXpath;
+ }
+
+ public void setSourceXpath(SynapseXPath sourceXpath) {
+ this.sourceXpath = sourceXpath;
+ }
+
+ public String getResultValue() {
+ return resultValue;
+ }
+
+ public void setResultValue(String resultValue) {
+ this.resultValue = resultValue;
+ }
+}
Added: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java?pathrev=24296
==============================================================================
--- (empty file)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java Wed Nov 19 23:52:41 2008
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.eventing.managers;
+
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.eventing.EventFilter;
+import org.apache.synapse.eventing.Subscription;
+import org.apache.synapse.eventing.SubscriptionManager;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ */
+public class DefaultInMemorySubscriptionManager extends SubscriptionManager {
+
+ Map<String, Subscription> store = new ConcurrentHashMap<String, Subscription>();
+
+ public String addSubscription(Subscription subs) {
+ if (subs.getId() == null) {
+ subs.setId(org.apache.axiom.om.util.UUIDGenerator.getUUID());
+ }
+ store.put(subs.getId(), subs);
+ return subs.getId();
+ }
+
+ public boolean deleteSubscription(String id) {
+ if (store.containsKey(id)) {
+ store.remove(id);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public List<Subscription> getSubscribers() {
+ LinkedList<Subscription> list = new LinkedList<Subscription>();
+ for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) {
+ list.add(stringSubscriptionEntry.getValue());
+ }
+ return list;
+ }
+
+ public List<Subscription> getMatchingSubscribers(MessageContext mc) {
+ LinkedList<Subscription> list = new LinkedList<Subscription>();
+ for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) {
+ EventFilter filter = stringSubscriptionEntry.getValue().getFilter();
+ if (filter == null || filter.isSatisfied(mc)) {
+ list.add(stringSubscriptionEntry.getValue());
+ }
+ }
+ return list;
+ }
+
+ public Subscription getSubscription(String id) {
+ return store.get(id);
+ }
+
+ public void init() {
+ // nothing to do at the inmemory subscriber initialization
+ }
+}
Modified: branches/synapse/1.2.wso2v1/repository/conf/synapse.xml
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/repository/conf/synapse.xml?rev=24296&r1=24295&r2=24296&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/repository/conf/synapse.xml (original)
+++ branches/synapse/1.2.wso2v1/repository/conf/synapse.xml Wed Nov 19 23:52:41 2008
@@ -19,18 +19,14 @@
<!-- A simple Synapse configuration -->
<definitions xmlns="http://ws.apache.org/ns/synapse">
- <in>
- <!-- Log all messages passing through -->
- <log level="full"/>
-
- <!-- ensure that the default configuration only sends if it is one of samples -->
- <!-- Otherwise Synapse would be an open proxy by default (BAD!) -->
- <filter source="get-property('To')" regex="http://localhost:9000.*">
- <!-- Send the messages where they have been sent (i.e. implicit "To" EPR) -->
- <send/>
- </filter>
- </in>
- <out>
- <send/>
- </out>
-</definitions>
\ No newline at end of file
+ <eventSource name="eventing">
+ <subscriptionManager class="org.apache.synapse.eventing.managers.RegistryBasedSubscriptionManager">
+ <property name="registryURL" value="http://localhost:8180/wso2registry"/>
+ <property name="username" value="admin"/>
+ <property name="password" value="admin"/>
+ <property name="topicHeaderName" value="Topic"/>
+ <property name="topicHeaderNS" value="http://wso2.com/aip"/>
+ </subscriptionManager>
+ </eventSource>
+ <log level="full"/>
+</definitions>
More information about the Esb-java-dev
mailing list