001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.serialization;
020
021import java.io.*;
022import java.util.*;
023
024import org.jppf.JPPFError;
025import org.jppf.utils.*;
026import org.jppf.utils.configuration.*;
027import org.slf4j.*;
028
029/**
030 * Interface and factory for object serialization and deserialization schemes in JPPF.
031 * <p>A serialization scheme is defined as an implementation of this interface,
032 * and configured in a JPPF configuration file using the property definition:<br>
033 * <code>jppf.object.serialization.class = my.implementation.OfJPPFSerialization</code>
034 * <p>The same implementation must be used for all nodes, servers and clients.
035 * @author Laurent Cohen
036 */
037public interface JPPFSerialization {
038  /**
039   * Serialize an object into the specified output stream.
040   * @param o the object to serialize.
041   * @param os the stream that receives the serialized object.
042   * @throws Exception if any error occurs.
043   */
044  void serialize(Object o, OutputStream os) throws Exception;
045
046  /**
047   * Deserialize an object from the specified input stream.
048   * @param is the stream from which to deserialize.
049   * @return the serialized object.
050   * @throws Exception if any error occurs.
051   */
052  Object deserialize(InputStream is) throws Exception;
053
054  /**
055   * Factory class for instantiating a default or configured serialization.
056   */
057  public static class Factory {
058    /**
059     * Logger for this class.
060     */
061    private static Logger log = LoggerFactory.getLogger(Factory.class);
062    /**
063     * Determines whether the debug level is enabled in the log configuration, without the cost of a method call.
064     */
065    private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
066    /**
067     * The class of the serialization to use.
068     */
069    private static Class<? extends JPPFSerialization> serializationClass = null;
070    /**
071     * The class of the composite to use on top of the serialization.
072     */
073    private final static Map<String, Class<? extends JPPFCompositeSerialization>> compositeMap = new HashMap<>();
074    /**
075     * The class of the composite to use on top of the serialization.
076     */
077    private final static List<Class<? extends JPPFCompositeSerialization>> compositeClasses = new ArrayList<>();
078    static {
079      init();
080      configure();
081    }
082
083    /**
084     * Initialize the serialization.
085     */
086    private static void init() {
087      final List<Class<? extends JPPFCompositeSerialization>> list = new ServiceFinder().findProviderClassess(JPPFCompositeSerialization.class, null, false);
088      for (final Class<? extends JPPFCompositeSerialization> c: list) {
089        try {
090          final JPPFCompositeSerialization jcs = c.newInstance();
091          compositeMap.put(jcs.getName().toUpperCase(), c);
092        } catch (final Exception e) {
093          final StringBuilder sb = new StringBuilder("Could not instantiate composite serialization '");
094          sb.append(c.getName()).append("', terminating this application");
095          log.error(sb.toString(), e);
096          throw new JPPFError(sb.toString(), e);
097        }
098      }
099      if (debugEnabled) log.debug("compositeMap = {}", compositeMap);
100    }
101
102    /**
103     * Initialize the serialization.
104     */
105    @SuppressWarnings("unchecked")
106    private static void configure() {
107      final JPPFProperty<String> prop = JPPFProperties.OBJECT_SERIALIZATION_CLASS;
108      String className = null;
109      final String value  = JPPFConfiguration.get(prop);
110      if (value != null) {
111        final String[] elts = RegexUtils.SPACES_PATTERN.split(value);
112        if (elts.length == 1) className = elts[0];
113        else if (elts.length >= 2) {
114          for (int i=0; i<elts.length-1; i++) {
115            final String name = elts[i].toUpperCase();
116            final Class<? extends JPPFCompositeSerialization> c = compositeMap.get(name);
117            compositeClasses.add(c);
118          }
119          className = elts[elts.length - 1];
120        }
121      }
122      if (debugEnabled) log.debug("found " + prop.getName() + " = " + className);
123      if (className != null) {
124        if (debugEnabled) log.debug("serializationClass={}, compositeClasses={}, compositeMap={}", className, compositeClasses, compositeMap);
125        try {
126          serializationClass = (Class<? extends JPPFSerialization>) Class.forName(className);
127        } catch (final Exception e) {
128          final StringBuilder sb = new StringBuilder("Could not instantiate JPPF serialization [");
129          sb.append(prop.getName()).append(" = ").append(className);
130          sb.append(", terminating this application");
131          log.error(sb.toString(), e);
132          throw new JPPFError(sb.toString(), e);
133        }
134      } else {
135        if (debugEnabled) log.debug("using DefaultJavaSerialization");
136        serializationClass = DefaultJavaSerialization.class;
137      }
138    }
139
140    /**
141     * Get the configured serialization.
142     * @return an instance of {@link JPPFSerialization}.
143     */
144    public static JPPFSerialization getSerialization() {
145      try {
146        final JPPFSerialization serialization = serializationClass.newInstance();
147        JPPFCompositeSerialization composite = null;
148        JPPFCompositeSerialization prev = null;
149        for (Class<? extends JPPFCompositeSerialization> c: compositeClasses) {
150          final JPPFCompositeSerialization tmp = c.newInstance();
151          if (composite == null) composite = tmp;
152          if (prev != null) prev.delegateTo(tmp);
153          prev = tmp;
154        }
155        if (prev != null) prev.delegateTo(serialization);
156        return (composite == null) ? serialization : composite;
157      } catch (final Exception e) {
158        final String msg = String.format("error instantiating serialization scheme '%s'", JPPFConfiguration.get(JPPFProperties.OBJECT_SERIALIZATION_CLASS));
159        log.error(msg + " :\n{}", ExceptionUtils.getStackTrace(e));
160        throw new JPPFError(msg, e);
161      }
162    }
163
164    /**
165     * Reset the configured serialization.
166     * @exclude
167     */
168    public static void reset() {
169      serializationClass = null;
170      compositeClasses.clear();
171      configure();
172      if (debugEnabled) log.debug("serialization = {}, composite = {}", serializationClass, compositeClasses);
173    }
174  }
175}