http server: do not terminate old thread when threads limit reached

Original implementation of the thread pool was limiting max number of
threads by terminating oldest thread if the limit was already reached and
a new request has arrived. This was causing broken responses under load.
Instead we now wait until one of the threads completes before starting a
new one.
This commit is contained in:
Sergej Orlov 2020-02-02 16:19:56 +01:00
parent 179525712f
commit 24d0e97c34

View file

@ -15,6 +15,7 @@ import java.net.Socket;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.concurrent.Semaphore;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -46,14 +47,8 @@ public class RouteServer extends Thread
private Socket clientSocket = null; private Socket clientSocket = null;
private RoutingEngine cr = null; private RoutingEngine cr = null;
private volatile boolean terminated; private Semaphore semaphore = null;
public void stopRouter()
{
RoutingEngine e = cr;
if ( e != null ) e.terminate();
}
private static DateFormat tsFormat = new SimpleDateFormat( "dd.MM.yy HH:mm", new Locale( "en", "US" ) ); private static DateFormat tsFormat = new SimpleDateFormat( "dd.MM.yy HH:mm", new Locale( "en", "US" ) );
private static String formattedTimestamp() private static String formattedTimestamp()
@ -270,7 +265,7 @@ public class RouteServer extends Thread
if ( br != null ) try { br.close(); } catch( Exception e ) {} if ( br != null ) try { br.close(); } catch( Exception e ) {}
if ( bw != null ) try { bw.close(); } catch( Exception e ) {} if ( bw != null ) try { bw.close(); } catch( Exception e ) {}
if ( clientSocket != null ) try { clientSocket.close(); } catch( Exception e ) {} if ( clientSocket != null ) try { clientSocket.close(); } catch( Exception e ) {}
terminated = true; semaphore.release();
} }
} }
@ -294,8 +289,7 @@ public class RouteServer extends Thread
serviceContext.sharedProfileDir = tk.hasMoreTokens() ? tk.nextToken() : serviceContext.customProfileDir; serviceContext.sharedProfileDir = tk.hasMoreTokens() ? tk.nextToken() : serviceContext.customProfileDir;
int maxthreads = Integer.parseInt( args[4] ); int maxthreads = Integer.parseInt( args[4] );
Semaphore semaphore = new Semaphore(maxthreads, true);
TreeMap<Long,RouteServer> threadMap = new TreeMap<Long,RouteServer>();
ServerSocket serverSocket = args.length > 5 ? new ServerSocket(Integer.parseInt(args[3]),50,InetAddress.getByName(args[5])) : new ServerSocket(Integer.parseInt(args[3])); ServerSocket serverSocket = args.length > 5 ? new ServerSocket(Integer.parseInt(args[3]),50,InetAddress.getByName(args[5])) : new ServerSocket(Integer.parseInt(args[3]));
@ -318,36 +312,8 @@ public class RouteServer extends Thread
RouteServer server = new RouteServer(); RouteServer server = new RouteServer();
server.serviceContext = serviceContext; server.serviceContext = serviceContext;
server.clientSocket = clientSocket; server.clientSocket = clientSocket;
server.semaphore = semaphore;
// cleanup thread list semaphore.acquire();
for(;;)
{
boolean removedItem = false;
for (Map.Entry<Long,RouteServer> e : threadMap.entrySet())
{
if ( e.getValue().terminated )
{
threadMap.remove( e.getKey() );
removedItem = true;
break;
}
}
if ( !removedItem ) break;
}
// kill thread if limit reached
if ( threadMap.size() >= maxthreads )
{
Long k = threadMap.firstKey();
RouteServer victim = threadMap.get( k );
threadMap.remove( k );
victim.stopRouter();
}
long ts = System.currentTimeMillis();
while ( ts <= last_ts ) ts++;
threadMap.put( Long.valueOf( ts ), server );
last_ts = ts;
server.start(); server.start();
} }
} }