[esb-java-dev] svn commit r24619 - branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing
asankaa at wso2.com
asankaa at wso2.com
Sun Nov 23 08:57:27 PST 2008
Author: asankaa
Date: Sun Nov 23 08:57:27 2008
New Revision: 24619
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=24619
Log:
remove message reciver and made eventsource as the core message reciver
Removed:
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingMessageReceiver.java
Modified:
branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java
Modified: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java
URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java?rev=24619&r1=24618&r2=24619&view=diff
==============================================================================
--- branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java (original)
+++ branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventSource.java Sun Nov 23 08:57:27 2008
@@ -20,21 +20,35 @@
package org.apache.synapse.eventing;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.InOutAxisOperation;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.util.MessageHelper;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.EndpointDefinition;
+import org.apache.synapse.eventing.builders.SubscriptionMessageBuilder;
+import org.apache.synapse.eventing.builders.ResponseMessageBuilder;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.core.axis2.SynapseMessageReceiver;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.axiom.soap.SOAPEnvelope;
import org.wso2.eventing.EventingConstants;
import javax.xml.namespace.QName;
+import java.util.List;
/**
*
*/
-public class EventSource {
+public class EventSource extends SynapseMessageReceiver {
private String name;
private SubscriptionManager subscriptionManager;
+ private EventSource eventSource;
public EventSource(String name) {
this.name = name;
@@ -64,13 +78,64 @@
// Set the names of the two messages so that Axis2 is able to produce a WSDL (see SYNAPSE-366):
// mediateOperation.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE).setName("in");
// mediateOperation.getMessage(WSDLConstants.MESSAGE_LABEL_OUT_VALUE).setName("out");
- EventingMessageReceiver receiver = new EventingMessageReceiver(this);
- mediateOperation.setMessageReceiver(receiver);
- subscribeOperation.setMessageReceiver(receiver);
+ this.setEventSource(this);
+ mediateOperation.setMessageReceiver(this);
+ subscribeOperation.setMessageReceiver(this);
subscribeOperation.setSoapAction(EventingConstants.WSE_SUBSCRIBE);
eventSourceService.addOperation(mediateOperation);
eventSourceService.addOperation(subscribeOperation);
axisCfg.addService(eventSourceService);
}
+
+ /**
+ *
+ * @return
+ */
+ public EventSource getEventSource() {
+ return eventSource;
+ }
+
+ /**
+ *
+ * @param eventSource
+ */
+ public void setEventSource(EventSource eventSource) {
+ this.eventSource = eventSource;
+ }
+ public void receive(MessageContext mc) throws AxisFault {
+ SynapseConfiguration synCfg = (SynapseConfiguration) mc.getConfigurationContext()
+ .getAxisConfiguration().getParameter(SynapseConstants.SYNAPSE_CONFIG).getValue();
+ SynapseEnvironment synEnv = (SynapseEnvironment) mc.getConfigurationContext()
+ .getAxisConfiguration().getParameter(SynapseConstants.SYNAPSE_ENV).getValue();
+ org.apache.synapse.MessageContext smc = new Axis2MessageContext(mc, synCfg, synEnv);
+ if (EventingConstants.WSE_SUBSCRIBE.equals(mc.getWSAAction())) {
+ Subscription subscription = SubscriptionMessageBuilder.createSubscription(smc);
+ eventSource.getSubscriptionManager().addSubscription(subscription);
+ ResponseMessageBuilder messageBuilder = new ResponseMessageBuilder(mc);
+ SOAPEnvelope soapEnvelope = messageBuilder.genSubscriptionResponse(subscription);
+ String replyAddress = mc.getOptions().getReplyTo().getAddress();
+ AddressEndpoint endpoint = new AddressEndpoint();
+ EndpointDefinition def = new EndpointDefinition();
+ def.setAddress(replyAddress.trim());
+ endpoint.setDefinition(def);
+ org.apache.synapse.MessageContext rmc = new Axis2MessageContext(mc, synCfg, synEnv);
+ rmc.setTo(new EndpointReference(replyAddress));
+ rmc.setEnvelope(soapEnvelope);
+ endpoint.send(MessageHelper.cloneMessageContext(rmc));
+ }else if (EventingConstants.WSE_UNSUBSCRIBE.equals(mc.getWSAAction())) {
+ // Unsubscribe for responce
+ }else if (EventingConstants.WSE_GET_STATUS.equals(mc.getWSAAction())) {
+ // Get responce status
+ }else if (EventingConstants.WSE_RENEW.equals(mc.getWSAAction())) {
+ // Renew subscription
+ } else {
+ // Treat as an Event
+ List<Subscription> subscribers = eventSource.getSubscriptionManager().getMatchingSubscribers(smc);
+ for (Subscription subscription : subscribers) {
+ smc.setProperty("OUT_ONLY","true"); // Set one way message for events
+ subscription.getEndpoint().send(MessageHelper.cloneMessageContext(smc));
+ }
+ }
+ }
}
Deleted: branches/synapse/1.2.wso2v1/modules/core/src/main/java/org/apache/synapse/eventing/EventingMessageReceiver.java
URL: http://wso2.org/svn/browse/wso2/None?pathrev=24618
More information about the Esb-java-dev
mailing list